Topic message headers getting encoded during sync using mirrormaker

I am trying to sync kafka data from source to a target kafka cluster using the kafka-connect hosted on target kafka cluster as bootstrap server and then using mirrormaker connectors on it, with below configuration. The topics at source are created by producer service with default partition count 3, and when we produce and read messages it coming correctly. But when we increase the partition count on a particular topic using alter topic command. And then if we produce a message at source and read it on target, we see the headers on the topic message are encoded. The header keys are non-encoded, but the values are encoded. While source has non-encoded headers. Any header encoding configuration to be done or is this behavior because of manual partition count increase or what?

Mirrormaker and connect-configuration:
metadata:
name: mirrormaker2-config
data:
connect-distributed.properties: |
bootstrap.servers={{TARGET_KAFKA_HOST}}
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=60000

mirror-checkpoint-connector.json: |
{
“name”: “mirror-checkpoint-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorCheckpointConnector”,
“source.cluster.alias”: “{{SOURCE_CLUSTER_ALIAS}}”,
“target.cluster.alias”: “{{TARGET_CLUSTER_ALIAS}}”,
“source.cluster.bootstrap.servers”: “{{SOURCE_KAFKA_HOST}}”,
“target.cluster.bootstrap.servers”: “{{TARGET_KAFKA_HOST}}”,
“tasks.max”: “1”,
“emit.checkpoints.enabled”: “true”,
“sync.group.offsets.enabled”: “true”,
“emit.checkpoints.interval.seconds”: “60”,
“sync.group.offsets.interval.seconds”: “60”,
“refresh.groups.enabled”: “true”,
“refresh.groups.interval.seconds”: “600”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”
}
}
mirror-heartbeat-connector.json: |
{
“name”: “mirror-heartbeat-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorHeartbeatConnector”,
“source.cluster.alias”: “{{SOURCE_CLUSTER_ALIAS}}”,
“target.cluster.alias”: “{{TARGET_CLUSTER_ALIAS}}”,
“source.cluster.bootstrap.servers”: “{{SOURCE_KAFKA_HOST}}”,
“target.cluster.bootstrap.servers”: “{{TARGET_KAFKA_HOST}}”,
“tasks.max”: “1”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“emit.heartbeats.enabled”: “true”,
“emit.heartbeats.interval.seconds”: “1”
}
}
mirror-source-connector.json: |
{
“name”: “mirror-source-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorSourceConnector”,
“source.cluster.alias”: “{{SOURCE_CLUSTER_ALIAS}}”,
“target.cluster.alias”: “{{TARGET_CLUSTER_ALIAS}}”,
“source.cluster.bootstrap.servers”: “{{SOURCE_KAFKA_HOST}}”,
“target.cluster.bootstrap.servers”: “{{TARGET_KAFKA_HOST}}”,
“topics”: “.*”,
“topics.exclude”: “heartbeats|connect-.*|.*[-.]internal|.*\\.replica|__.*”,
“tasks.max”: “1”,
“auto.offset.reset”: “earliest”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“sync.topic.configs.enabled”: “true”,
“sync.topic.acls.enabled”: “true”,
“refresh.topics.enabled”: “true”,
“refresh.topics.interval.seconds”: “600”,
“sync.topic.configs.interval.seconds”: “600”,
“sync.topic.acls.interval.seconds”: “600”,
“replication.factor”: “3”
}
}

Possibly you’re hitting this bug? If so, try the guidance here to explicitly make header conversion a no-op:

header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Should I add this property to the connect cluster configuration or to the connectors(source. Heartbeat and checkpoint) or both?

Only add it to the connector configs. While there are certain globally applicable connector configs (i.e., those that apply to all connectors) that get passed down from worker config to connectors (e.g., underlying Kafka client producer.* and consumer.* configs), non-globally applicable configs like MM2’s header.converter wouldn’t pass through from worker to connectors.

Here, it mentions the property header.converter in connect configuration as well as connector configuration.

Oh I didn’t realize it lives there too – it should work either way then. I personally would probably just put it only in the connectors since the decision is specifically working around MM2’s bug. Also makes the connector config safe across connect clusters, allow it to coexist with other connectors that want the default converter, etc. I’m guessing those are stretch scenarios for you though so go with whatever your preference is and test it out.