MirrorMaker Endless loop

Hello All,
First time trying to set up MM2 for replicating from one msk cluster to another followign this blog: Use MSK Connect for managed MirrorMaker 2 deployment with IAM authentication | AWS Big Data Blog
But for some reason it creating a circular loop. it replicates topic test in msk 1 to topic source.test in msk 2, but then it replicates msk2 source.test to msk 2 source.source.test and then back again to source.source.source.test and so on. Really confused why its doing this. I’ve tried adding source.* to the topics.exclude list.
also tried:
source->target.enabled: true
target->source.enabled: false
doesnt seem to be working.
Using KafkaConnectVersion: 2.7.1
kafka cluster version: 2.8.1
Any ideas?

MirrorSourceConnector:

connector.class: org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max: 10
clusters: source,target
source.cluster.alias: source
target.cluster.alias: target
source->target.enabled: true
target->source.enabled: false
source.cluster.bootstrap.servers: <SOURCE_BOOTSTRAP_URLS>
source.cluster.producer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.producer.security.protocol: SASL_SSL
source.cluster.producer.sasl.mechanism: AWS_MSK_IAM
source.cluster.producer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.consumer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.consumer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.consumer.security.protocol: SASL_SSL
source.cluster.consumer.sasl.mechanism: AWS_MSK_IAM
source.cluster.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.sasl.mechanism: AWS_MSK_IAM
source.cluster.security.protocol: SASL_SSL
source.cluster.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.bootstrap.servers: <TARGET_BOOTSTRAP_URLS>
target.cluster.security.protocol: SASL_SSL
target.cluster.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.producer.sasl.mechanism: AWS_MSK_IAM
target.cluster.producer.security.protocol: SASL_SSL
target.cluster.producer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.producer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.security.protocol: SASL_SSL
target.cluster.consumer.sasl.mechanism: AWS_MSK_IAM
target.cluster.consumer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.sasl.mechanism: AWS_MSK_IAM
target.cluster.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
refresh.groups.enabled: true
refresh.groups.interval.seconds: 60
refresh.topics.interval.seconds: 60
source->target.topics: .*
topics.exclude: .*[-.]internal, .*.replica, __.*, .*-config, .*-status, .*-offset, source.*, heartbeats
emit.checkpoints.enabled: true
topics: .*
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
producer.max.block.ms: 10000
producer.linger.ms: 500
producer.retry.backoff.ms: 1000
sync.topic.configs.enabled: true
sync.topic.configs.interval.seconds: 60
refresh.topics.enabled: true
groups.exclude: console-consumer-.*, connect-.*, __.*
consumer.auto.offset.reset: earliest
replication.factor: 3

MirrorHeartbeatConnector

connector.class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
source.cluster.alias: source
target.cluster.alias: target
clusters: source,target
source.cluster.bootstrap.servers: <SOURCE_BOOTSTRAP_URL>
target.cluster.security.protocol: SASL_SSL
target.cluster.producer.security.protocol: SASL_SSL
target.cluster.consumer.security.protocol: SASL_SSL
target.cluster.sasl.mechanism: AWS_MSK_IAM
target.cluster.producer.sasl.mechanism: AWS_MSK_IAM
target.cluster.consumer.sasl.mechanism: AWS_MSK_IAM
target.cluster.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.producer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.consumer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.producer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.security.protocol: SASL_SSL
source.cluster.producer.security.protocol: SASL_SSL
source.cluster.consumer.security.protocol: SASL_SSL
source.cluster.sasl.mechanism: AWS_MSK_IAM
source.cluster.producer.sasl.mechanism: AWS_MSK_IAM
source.cluster.consumer.sasl.mechanism: AWS_MSK_IAM
source.cluster.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.producer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.consumer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.producer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.consumer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
topics: .*
topics.blacklist: .*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset,source.*,heartbeats
groups.blacklist: console-consumer-.*,connect-.*,__.*
refresh.groups.enabled: true
refresh.groups.interval.seconds: 60
emit.checkpoints.enabled: true
consumer.auto.offset.reset: earliest
producer.linger.ms: 500
producer.retry.backoff.ms: 1000
producer.max.block.ms: 10000
replication.factor: 3
tasks.max: 1
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

MirrorCheckpointConnector:

connector.class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
source.cluster.alias: source
target.cluster.alias: target
clusters: source,target
source.cluster.bootstrap.servers: <SOURCE_BOOTSTRAP_URLS>
target.cluster.bootstrap.servers: <TARGET_BOOTSTRAP_URLS>
target.cluster.security.protocol: SASL_SSL
target.cluster.producer.security.protocol: SASL_SSL
target.cluster.consumer.security.protocol: SASL_SSL
target.cluster.sasl.mechanism: AWS_MSK_IAM
target.cluster.producer.sasl.mechanism: AWS_MSK_IAM
target.cluster.consumer.sasl.mechanism: AWS_MSK_IAM
target.cluster.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.producer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.consumer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
target.cluster.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.producer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.security.protocol: SASL_SSL
source.cluster.producer.security.protocol: SASL_SSL
source.cluster.consumer.security.protocol: SASL_SSL
source.cluster.sasl.mechanism: AWS_MSK_IAM
source.cluster.producer.sasl.mechanism: AWS_MSK_IAM
source.cluster.consumer.sasl.mechanism: AWS_MSK_IAM
source.cluster.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.producer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.consumer.sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries='50' awsMaxBackOffTimeMs='1000';"
source.cluster.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.producer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.consumer.sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
topics: .*
topics.blacklist: .*[-.]internal,.*.replica, __.*,.*-config,.*-status,.*-offset,source.*,heartbeats
groups.blacklist: console-consumer-.*,connect-.*,__.*
refresh.groups.enabled: true
refresh.groups.interval.seconds: 60
emit.checkpoints.enabled: true
consumer.auto.offset.reset: earliest
producer.linger.ms: 500
producer.retry.backoff.ms: 1000
producer.max.block.ms: 10000
replication.factor: 3
tasks.max: 1
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
sync.group.offsets.interval.seconds: 5```

Hi @brandocomando8

hmm strange need to take a detailed look in your conf.
basically mirror maker should prevent cycles
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-Cycledetection

In the meantime you may have a look at this existing thread

Best,
Michael

I did take a look at that issue, and they seem to be trying to replicate without the source prefix and that seemed to be the problem. In my example, I’m fine with having the source prefix.

So I ended up figuring this out. seems specific to Aws kafka connect, but I was running this in the primary region connected to the source msk cluster, but the blog specifies to run it in the secondary region, connected to the target msk cluster. Apparently what ever config that aws adds behind the scenes was causing the issue.

1 Like

thanks for letting us know :slight_smile: