Mirrormaker2 creating heartbeats topics recursively in loop in active-active mode

running within docker
mirrormaker kafka 3.4 → 2.8.1 works basically as expected

kafka-topics -list --bootstrap-server localhost:9095
A.checkpoints.internal
A.heartbeats
__consumer_offsets
heartbeats
mm2-configs.A.internal
mm2-offset-syncs.A.internal
mm2-offsets.A.internal
mm2-status.A.internal
source-34
kafka-topics -list --bootstrap-server localhost:9094
B.checkpoints.internal
B.source-34
__consumer_offsets
heartbeats
mm2-configs.B.internal
mm2-offset-syncs.B.internal
mm2-offsets.B.internal
mm2-status.B.internal

Could you please share the configuration file you’re using? We suspect that there might be an issue with the configuration.

Additionally, was your test executed in active-active mode? We encountered issues when enabling reverse replication from version 3.4 to 2.8.1, although forward replication from version 2.8.1 to 3.4 worked without any problems.

Thanks again

I’ve used the copy you provided above.

clusters = A, B

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = localhost:9094
B.bootstrap.servers = localhost:9095

# enable and configure individual replication flows
A->B.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
A->B.topics = .*

B->A.enabled = true
B->A.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=1

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1


errors.log.enable = true

errors.log.include.messages = true

tested in active active mode with bitnami docker images.

did you use confluent binaries right?

We are using open source Apche Kafka on CentOS 7.

ah see
so basically same as I do
strange bot able to reproduce yet event with load ob both clusters

Hi Michael,

One more issue noticed in the activity: MM2 is creating heartbeats topics with the prefix which we didn’t even provided in the config file

We have provided 2 source for forward replication

  • src. = source 1
  • src_mcb. = source 2
  • gcp. = destination (for reverse replication)

List of heartbeats topics created. Its weird to see source.heartbeats topics here, and we are unable to find root cause for this. Is this MM2’s default value as source. prefix?

heartbeats
source.heartbeats
source.src.heartbeats
source.src.src.heartbeats
src.gcp.heartbeats
src.gcp.src.gcp.heartbeats
src.gcp.src.gcp.src.heartbeats
src.gcp.src.heartbeats
src.gcp.src.src.heartbeats
src.gcp.src_mcb.heartbeats
src.heartbeats
src.src.heartbeats
src_mcb.gcp.heartbeats
src_mcb.gcp.source.heartbeats
src_mcb.heartbeats

We appreciate your valuable input here

Thanks and regards

mmh looks a bit weird

from my understanding it should look like
$yoursourcealias.*

you’re running one mm2 instance right?

and the config for the above?

Here’s the config file

## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = src,src_mcb-dst

# Maximum number of tasks to use for this connector
tasks.max = 4
# Setting replication factor of newly created remote topics
replication.factor = 3

errors.log.enable = true
errors.log.include.messages = true

# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

## Kafka clusters aliases
clusters = src, src_mcb, dst


# upstream cluster to replicate
src.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092
# For MCB to AWS-shared
src_mcb.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092

# downstream cluster
dst.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092

# enable and configure individual replication flows
src->dst.enabled = true
# For MCB to AWS-shared
src_mcb->dst.enabled = true

# whether or not to monitor source cluster for configuration changes
src->dst.sync.topics.configs.enabled = true
# For MCB to AWS-shared
src_mcb->dst.sync.topics.configs.enabled = true

# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
src->dst.topics =  daaaa|bbbb|cccc

# For MCB to AWS-shared
src_mcb->dst.topics =  aaaa|bbbb|cccc

# Configure from when the MM2 shuld start replicating data
src->dst.consumer.auto.offset.reset = latest
# For MCB to AWS-shared
src_mcb->dst.consumer.auto.offset.reset = latest

# Sync consumer group offsets
src->dst.exclude.internal.topics = false
src->dst.emit.heartbeats.enabled = true
# For MCB to AWS-shared
src_mcb->dst.exclude.internal.topics = false
src_mcb->dst.emit.heartbeats.enabled = true

# Enable automated consumer offset sync
src->dst.sync.group.offsets.enabled = true
src->dst.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000
# For MCB to AWS-shared
src_mcb->dst.sync.group.offsets.enabled = true
src_mcb->dst.emit.checkpoints.enabled = true

# SASL Configurations

dst.security.protocol=SASL_PLAINTEXT
dst.sasl.mechanism=SCRAM-SHA-256
dst.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xxxxx";


So our working architecture is like as follows:

  • src Kafka Version: 2.8.0
  • src_mcb Version: 2.5.1
  • dst Kafka Version: 3.4.1

Forward replication will be from src and src_mcb cluster to dst cluster. And reverse replication will be from dst to src and src_mcb cluster.

And earlier forward was working fine, but one connect worker went down and we restarted the service and it created source. prefix heartbeat topics, which we didn’t even specified in our config file.

reverse replication from dst cluster to src_mcb (2.5.1) cluster works well, but creates heartbeats topics recursively

Btw we are still stuck on reverse replication from dst cluster to src cluster (2.8.0), as its not even creating the actual topics to be replicated specified in config file .

regarding the recursively created topics have look at this issue
https://issues.apache.org/jira/browse/KAFKA-9914

→ scroll down to the bottom, does this help you somehow?

and I do not see any replication flow from
dst --> src or dst --> src_mcb

or did I oversee?

best,
michael

That’s the different MM2 connect process running on src cluster as below

## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = aws-bcb
# Maximum number of tasks to use for this connector
tasks.max = 4
# Setting replication factor of newly created remote topics
replication.factor = 3

errors.log.enable = true
errors.log.include.messages = true

# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

## Kafka clusters aliases
clusters = aws, bcb
# upstream cluster to replicate
aws.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092

# downstream cluster
bcb.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092

# enable and configure individual replication flows
aws->bcb.enabled = true

# whether or not to monitor source cluster for configuration changes
aws->bcb.sync.topics.configs.enabled = true

# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
aws->bcb.topics =  aaaa|bbbb|cccc

# Configure from when the MM2 shuld start replicating data
aws->bcb.consumer.auto.offset.reset = latest

# Sync consumer group offsets
aws->bcb.exclude.internal.topics = false
aws->bcb.emit.heartbeats.enabled = true

# Enable automated consumer offset sync
aws->bcb.sync.group.offsets.enabled = true
aws->bcb.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000

# SASL Configurations

aws.security.protocol=SASL_PLAINTEXT
aws.sasl.mechanism=SCRAM-SHA-256
aws.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xxxxxx";

And the different MM2 connect process running on src_mcb cluster as below

## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = aws-mcb
# Maximum number of tasks to use for this connector
tasks.max = 4
# Setting replication factor of newly created remote topics
replication.factor = 3

errors.log.enable = true
errors.log.include.messages = true

# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

## Kafka clusters aliases
clusters = aws, mcb
# upstream cluster to replicate
aws.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092

# downstream cluster
mcb.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092

# enable and configure individual replication flows
aws->mcb.enabled = true

# whether or not to monitor source cluster for configuration changes
aws->mcb.sync.topics.configs.enabled = true

# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
aws->mcb.topics =  aaaa|bbbb|cccc

# Configure from when the MM2 shuld start replicating data
aws->mcb.consumer.auto.offset.reset = latest

# Sync consumer group offsets
aws->mcb.exclude.internal.topics = false
aws->mcb.emit.heartbeats.enabled = true

# Enable automated consumer offset sync
aws->mcb.sync.group.offsets.enabled = true
aws->mcb.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000

# SASL Configurations

aws.security.protocol=SASL_PLAINTEXT
aws.sasl.mechanism=SCRAM-SHA-256
aws.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xxxxxx";

ok I see

so aws → mcb and aws → bcb are not working correct?

I’m a bit lost with the different names now :smiley:

Apologies for confusing you :sweat_smile:

aws(3.4.1) → mcb(2.5.1) works well, but creates heartbeats topics recursively

for aws(3.4.1) → bcb(2.8.0), it’s not even creating the actual topics to be replicated specified in config file .and

short update:

3.4.1 → 2.5.1 works fine for me

B.checkpoints.internal
B.demo-mr
B.heartbeats
__consumer_offsets
heartbeats
mm2-configs.B.internal
mm2-offsets.B.internal
mm2-status.B.internal
__consumer_offsets
demo-mr
heartbeats
mm2-configs.A.internal
mm2-offset-syncs.A.internal
mm2-offsets.A.internal
mm2-status.A.internal

qq: from which kafka home did you start the mirror maker?

was also able to get the 3.4.1 → 2.8 to work but only while explicitly setting the topic to be mirrored
regex not working properly

what works

 B->A.enabled = true
 B->A.topics = abc

what does not work

 B->A.enabled = true
 B->A.topics = .*

to be complete the following works for me

 B->A.topics = cdef
 B->A.topics = bla

Many thanks for support, we appreciate it

Regarding your test on explicitly setting the topic to be mirrored as below

B->A.topics = cdef
B->A.topics = bla

We will implement this in our ENV and will keep you updated. And hopefully will be able to mark it as resolved :smiley:.

Thanks again

One doubt,

If we keep each topic explicitly on a different line, we have almost 300 topics to be replicated. So adding 300 line only for topics in mm2.properties will make it overloaded, right?

I think a bit yes
not sure why the config is not working for that special setup.

maybe some tweaks/tests with the regex needed

@mmuehlbeyer Good day

We tried the resolution provided above, but unfortunately we are still stuck with the topic creation :cry:. MM2 is not creating the topics we are providing in X->Y.topics property, only its creating the internal topics

mm2-configs.X.internal
mm2-offsets.X.internal
mm2-status.X.internal
heartbeats

Versions:

X - 3.4.1
Y - 2.8.0

Our config file as follows:

## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = X-Y

# Maximum number of tasks to use for this connector
tasks.max = 4
# Setting replication factor of newly created remote topics
replication.factor = 3

errors.log.enable = true
errors.log.include.messages = true

# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

## Kafka clusters aliases
clusters = X, Y


# upstream cluster to replicate
X.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092
 

# downstream cluster
Y.bootstrap.servers = x.x.x.x:9092,y.y.y.y:9092,z.z.z.z:9092

# enable and configure individual replication flows
X->Y.enabled = true
 

# whether or not to monitor source cluster for configuration changes
X->Y.sync.topics.configs.enabled = true
 

# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
X->Y.topics =  eeee
X->Y.topics =  dddd
X->Y.topics =  cccc
X->Y.topics =  bbbb
X->Y.topics =  aaaa
 

# Configure from when the MM2 should start replicating data
X->Y.consumer.auto.offset.reset = latest
 

# Sync consumer group offsets
X->Y.exclude.internal.topics = false
X->Y.emit.heartbeats.enabled = true
 

# Enable automated consumer offset sync
X->Y.sync.group.offsets.enabled = true
X->Y.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000
 

# SASL Configurations

Y.security.protocol=SASL_PLAINTEXT
Y.sasl.mechanism=SCRAM-SHA-256
Y.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xxxxxx";

X.security.protocol=SASL_PLAINTEXT
X.sasl.mechanism=SCRAM-SHA-256
X.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="yyyyyy";