I am trying to sync kafka data from source to a target kafka cluster using the kafka-connect hosted on target kafka cluster as bootstrap server and then using mirrormaker connectors on it, with below configuration. The topics at source are created by producer service with default partition count 3, and when we produce and read messages it coming correctly. But when we increase the partition count on a particular topic using alter topic command. And then if we produce a message at source and read it on target, we see the headers on the topic message are encoded. The header keys are non-encoded, but the values are encoded. While source has non-encoded headers. Any header encoding configuration to be done or is this behavior because of manual partition count increase or what?
Mirrormaker and connect-configuration:
metadata:
name: mirrormaker2-config
data:
connect-distributed.properties: |
bootstrap.servers={{TARGET_KAFKA_HOST}}
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=60000
mirror-checkpoint-connector.json: |
{
“name”: “mirror-checkpoint-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorCheckpointConnector”,
“source.cluster.alias”: “{{SOURCE_CLUSTER_ALIAS}}”,
“target.cluster.alias”: “{{TARGET_CLUSTER_ALIAS}}”,
“source.cluster.bootstrap.servers”: “{{SOURCE_KAFKA_HOST}}”,
“target.cluster.bootstrap.servers”: “{{TARGET_KAFKA_HOST}}”,
“tasks.max”: “1”,
“emit.checkpoints.enabled”: “true”,
“sync.group.offsets.enabled”: “true”,
“emit.checkpoints.interval.seconds”: “60”,
“sync.group.offsets.interval.seconds”: “60”,
“refresh.groups.enabled”: “true”,
“refresh.groups.interval.seconds”: “600”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”
}
}
mirror-heartbeat-connector.json: |
{
“name”: “mirror-heartbeat-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorHeartbeatConnector”,
“source.cluster.alias”: “{{SOURCE_CLUSTER_ALIAS}}”,
“target.cluster.alias”: “{{TARGET_CLUSTER_ALIAS}}”,
“source.cluster.bootstrap.servers”: “{{SOURCE_KAFKA_HOST}}”,
“target.cluster.bootstrap.servers”: “{{TARGET_KAFKA_HOST}}”,
“tasks.max”: “1”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“emit.heartbeats.enabled”: “true”,
“emit.heartbeats.interval.seconds”: “1”
}
}
mirror-source-connector.json: |
{
“name”: “mirror-source-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorSourceConnector”,
“source.cluster.alias”: “{{SOURCE_CLUSTER_ALIAS}}”,
“target.cluster.alias”: “{{TARGET_CLUSTER_ALIAS}}”,
“source.cluster.bootstrap.servers”: “{{SOURCE_KAFKA_HOST}}”,
“target.cluster.bootstrap.servers”: “{{TARGET_KAFKA_HOST}}”,
“topics”: “.*”,
“topics.exclude”: “heartbeats|connect-.*|.*[-.]internal|.*\\.replica|__.*”,
“tasks.max”: “1”,
“auto.offset.reset”: “earliest”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“sync.topic.configs.enabled”: “true”,
“sync.topic.acls.enabled”: “true”,
“refresh.topics.enabled”: “true”,
“refresh.topics.interval.seconds”: “600”,
“sync.topic.configs.interval.seconds”: “600”,
“sync.topic.acls.interval.seconds”: “600”,
“replication.factor”: “3”
}
}