Repeated msgs when sync from src to target and then back to target from src

Hii everyone,
I am having two kafka clusters. I am trying to syncing data between the two so as to use in failover/switchover scenerio.
For this I am using kafka connect distributed server on which i am creating mm2 connectors(source, checkpoint and hearteat) using connector apis.
Firstly, I am syncing from a source(dev-7) to target (dev-5), all topics got replicated and consumer groups as well.
Then I stopped this connect server and redeployed with new configuration to replicated from dev-5 to dev-7 (reverse of earlier).
I have three concerns over here:

  1. Why heartbeat topic created on dev7 from first run is getting replicated to dev5 in second run. Though i have mentioned it in topics.exclude property.
  2. for the topic in dev-7, after the second i observe that when we check messages on this topic after second run, the topic has twice the msgs, and offsets also got doubled.
  3. For a consumer group and topic, replicated from dev7 to dev5, if we consume more msgs on target on that cluster, then lag and offset for this consumer group does not remain in sync from source.

These are my configurations:
connect-distributed.properties: |
bootstrap.servers=dev5server:9094
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=60000

mirror-checkpoint-connector.json: |
{
“name”: “mirror-checkpoint-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorCheckpointConnector”,
“source.cluster.alias”: “dev-7”,
“target.cluster.alias”: “dev-5”,
“source.cluster.bootstrap.servers”: “dev7server:9094”,
“target.cluster.bootstrap.servers”: “dev5server:9094”,
“tasks.max”: “1”,
“emit.checkpoints.enabled”: “true”,
“sync.group.offsets.enabled”: “true”,
“emit.checkpoints.interval.seconds”: “60”,
“sync.group.offsets.interval.seconds”: “60”,
“refresh.groups.enabled”: “true”,
“refresh.groups.interval.seconds”: “600”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”
}
}
mirror-heartbeat-connector.json: |
{
“name”: “mirror-heartbeat-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorHeartbeatConnector”,
“source.cluster.alias”: “dev-7”,
“target.cluster.alias”: “dev-5”,
“source.cluster.bootstrap.servers”: “dev7server:9094”,
“target.cluster.bootstrap.servers”: “dev5server:9094”,
“tasks.max”: “1”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“emit.heartbeats.enabled”: “true”,
“emit.heartbeats.interval.seconds”: “1”
}
}
mirror-source-connector.json: |
{
“name”: “mirror-source-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorSourceConnector”,
“source.cluster.alias”: “dev-7”,
“target.cluster.alias”: “dev-5”,
“source.cluster.bootstrap.servers”: “dev7server:9094”,
“target.cluster.bootstrap.servers”: “dev5server:9094”,
“topics”: “.",
“topics.exclude”: "heartbeats|connect-.
|.[-.]internal|.\.replica|__.*”,
“tasks.max”: “1”,
“auto.offset.reset”: “earliest”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“sync.topic.configs.enabled”: “true”,
“sync.topic.acls.enabled”: “true”,
“refresh.topics.enabled”: “true”,
“refresh.topics.interval.seconds”: “600”,
“sync.topic.configs.interval.seconds”: “600”,
“sync.topic.acls.interval.seconds”: “600”,
“replication.factor”: “3”
}
}

for ddev5 to dev7 for reverse run:

connect-distributed.properties: |
bootstrap.servers=dev7server:9094
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=60000
mirror-checkpoint-connector.json: |
{
“name”: “mirror-checkpoint-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorCheckpointConnector”,
“source.cluster.alias”: “dev-5”,
“target.cluster.alias”: “dev-7”,
“source.cluster.bootstrap.servers”: “dev5server:9094”,
“target.cluster.bootstrap.servers”: “dev7server:9094”,
“tasks.max”: “1”,
“emit.checkpoints.enabled”: “true”,
“sync.group.offsets.enabled”: “true”,
“emit.checkpoints.interval.seconds”: “60”,
“sync.group.offsets.interval.seconds”: “60”,
“refresh.groups.enabled”: “true”,
“refresh.groups.interval.seconds”: “600”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”
}
}
mirror-heartbeat-connector.json: |
{
“name”: “mirror-heartbeat-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorHeartbeatConnector”,
“source.cluster.alias”: “dev-5”,
“target.cluster.alias”: “dev-7”,
“source.cluster.bootstrap.servers”: “dev5server:9094”,
“target.cluster.bootstrap.servers”: “dev7server:9094”,
“tasks.max”: “1”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“emit.heartbeats.enabled”: “true”,
“emit.heartbeats.interval.seconds”: “1”
}
}
mirror-source-connector.json: |
{
“name”: “mirror-source-connector”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorSourceConnector”,
“source.cluster.alias”: “dev-5”,
“target.cluster.alias”: “dev-7”,
“source.cluster.bootstrap.servers”: “dev5server:9094”,
“target.cluster.bootstrap.servers”: “dev7server:9094”: “.",
“topics.exclude”: "heartbeats|connect-.
|.[-.]internal|.\.replica|__.*”,
“tasks.max”: “1”,
“auto.offset.reset”: “earliest”,
“replication.policy.class”: “org.apache.kafka.connect.mirror.IdentityReplicationPolicy”,
“key.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“sync.topic.configs.enabled”: “true”,
“sync.topic.acls.enabled”: “true”,
“refresh.topics.enabled”: “true”,
“refresh.topics.interval.seconds”: “600”,
“sync.topic.configs.interval.seconds”: “600”,
“sync.topic.acls.interval.seconds”: “600”,
“replication.factor”: “3”
}
}

kafka version for connect, dev5 and dev7 all are 3.8.0
Please suggest the necessary updates to fix this,

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.