Apache Kafka MirrorMaker 2 (MM2) bidirectional replication A <-> B

We have 2 clusters one in On_prem (10 brokers) and another on GCP (6 brokers). We have setup mm2 on GCP kafka cluster to replicate data to & from On_prem.

Replication from GCP to On_prem:

We see replication is running on 5 nodes out of 6. We are seeing an error as below on one node.

[2025-02-03 11:26:54,816] ERROR org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker clientId=connect-1, groupId=gcp-mm2] Failed to reconfigure connector's tasks (MirrorCheckpointConnector), retrying after backoff:
org.apache.kafka.connect.runtime.distributed.NotLeaderException: This worker is not able to communicate with the leader of the cluster, which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a distributed Kafka Connect cluster.
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1933)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1851)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$38(DistributedHerder.java:1864)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2051)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:451)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:352)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
[2025-02-03 11:26:54,816] DEBUG org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker clientId=connect-1, groupId=gcp-mm2] Ensuring group membership is still active
[2025-02-03 11:26:54,816] DEBUG org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker clientId=connect-1, groupId=gcp-mm2] Ensuring group membership is still active

We have tested the connectivity between the clusters and everything seems fine. We are uncertain if the replication of all topics will be handled by these five nodes, or if we are missing events to replicate it to the destination because of this one node.

Replication from On_prem to GCP:

We see replication is running on all the 6 nodes.

Below are the mm2.properties we set,

## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = {{ source_cluster_name }}-{{ dest_cluster_name }}

# Maximum number of tasks to use for this connector
tasks.max = 12
num.stream.threads = 6 

# Setting replication factor of newly created remote topics
replication.factor = 3

errors.log.enable = true
errors.log.include.messages = true

# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

## Kafka clusters aliases
clusters = {{ source_cluster_name }}, {{ dest_cluster_name }}


# upstream cluster to replicate
{{ source_cluster_name }}.bootstrap.servers = {{ source_cluster_ips }}
 

# downstream cluster
{{ dest_cluster_name }}.bootstrap.servers = {{ dest_cluster_ips }}

# enable and configure individual replication flows
{{ source_cluster_name }}->{{ dest_cluster_name }}.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.enabled = true

# whether or not to monitor source cluster for configuration changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topics.configs.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topics.configs.enabled = true

# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
{{ source_cluster_name }}->{{ dest_cluster_name }}.topics =  {{ src_to_gcp_topics_to_replicate }}
{{ dest_cluster_name }}->{{ source_cluster_name }}.topics =  {{ gcp_to_src_topics_to_replicate }}

# Configure from when the MM2 shuld start replicating data
{{ source_cluster_name }}->{{ dest_cluster_name }}.consumer.auto.offset.reset = earliest
{{ dest_cluster_name }}->{{ source_cluster_name }}.consumer.auto.offset.reset = earliest

# Sync consumer group offsets
{{ source_cluster_name }}->{{ dest_cluster_name }}.exclude.internal.topics = false
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.heartbeats.enabled = true
 
{{ dest_cluster_name }}->{{ source_cluster_name }}.exclude.internal.topics = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.heartbeats.enabled = true

# Enable automated consumer offset sync
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.group.offsets.enabled = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.checkpoints.enabled = true

{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.group.offsets.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.checkpoints.enabled = true

offset.flush.timeout.ms = 60000

 
# {{ source_cluster_name }} cluster overrides for {{ source_cluster_name }}->{{ dest_cluster_name }} replication
{{ source_cluster_name }}.max.poll.records = 20000
{{ source_cluster_name }}.receive.buffer.bytes = 33554432
{{ source_cluster_name }}.send.buffer.bytes = 33554432
{{ source_cluster_name }}.max.partition.fetch.bytes = 33554432
{{ source_cluster_name }}.message.max.bytes = 37755000
{{ source_cluster_name }}.compression.type = gzip
{{ source_cluster_name }}.max.request.size = 26214400
{{ source_cluster_name }}.buffer.memory = 524288000
{{ source_cluster_name }}.batch.size = 524288

# {{ dest_cluster_name }} cluster overrides for {{ dest_cluster_name }}->{{ source_cluster_name }} replication
{{ dest_cluster_name }}.max.poll.records = 20000
{{ dest_cluster_name }}.receive.buffer.bytes = 33554432
{{ dest_cluster_name }}.send.buffer.bytes = 33554432
{{ dest_cluster_name }}.max.partition.fetch.bytes = 33554432
{{ dest_cluster_name }}.message.max.bytes = 37755000
{{ dest_cluster_name }}.compression.type = gzip
{{ dest_cluster_name }}.max.request.size = 26214400
{{ dest_cluster_name }}.buffer.memory = 524288000
{{ dest_cluster_name }}.batch.size = 524288


# SASL Configurations

{{ dest_cluster_name }}.security.protocol=SASL_PLAINTEXT
{{ dest_cluster_name }}.sasl.mechanism=SCRAM-SHA-256
{{ dest_cluster_name }}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ user }}" password="{{ password }}";

Please let us know if we are missing anything on this. Thanks in advance!!!

hey @Mahesh welcome :slight_smile:

quick question:
how did you start MM2?
in distributed mode?

if so how did you?

best,
michael

Hi @mmuehlbeyer

Yes we have started and it is running.

[Unit]
Description=MirrorMaker2
ConditionPathExists={{ mirrorMakerTwoDir }}/mm2.properties
After=network.target

[Service]
Type=simple
#User=kafka
#SyslogIdentifier=kafka-mm2
EnvironmentFile={{ mirrorMakerDir }}/heap_env_file.txt
Environment="KAFKA_JMX_OPTS=-javaagent:{{ kafkaLibDir }}/jmx_prometheus_javaagent-0.17.0.jar=9191:{{ kafka_connect_file }}"
ExecStart={{ kafkaBinDir }}/connect-mirror-maker.sh {{ mirrorMakerTwoDir }}/mm2.properties
ExecStop=/usr/bin/pkill {{ kafkaBinDir }}/connect-mirror-maker.sh
#SuccessExitStatus=143
#Restart=always
#RestartSec=5
#StartLimitInterval=0
TimeoutSec=600s

[Install]
WantedBy=multi-user.target

We are running the mm2.service ONLY on GCP cluster on all the brokers (6) so that it is pulling and pushing topics data from On-Prem kafka cluster.

GCP → On_prem Kafka
GCP ← On_prem Kafka

Hey,

is my understanding correct, that you are running mirrormaker2 on each broker in standalone mode?

Hi @mmuehlbeyer

[root@Kafka-p-kfkbcb-ause1-1 bin]# pwd 
/code/kafka/3.4.1/bin 
[root@Kafka-p-kfkbcb-ause1-1 bin]# ll | grep -i connect 
-rwxr-xr-x. 1 kafka kafka 1423 Aug 20 09:29 connect-distributed.sh 
-rwxr-xr-x. 1 kafka kafka 1396 Aug 20 09:29 connect-mirror-maker.sh 
-rwxr-xr-x. 1 kafka kafka 1420 Aug 20 09:29 connect-standalone.sh

we are using connect-mirror-maker.sh for distributed replication, should we use connect-distributed.sh

Thank you!

running connect-mirror-maker.sh is basically fine
though running on each broker is not recommend.

If you would like to run this in a distributed way you should run it inside a Kafka connect cluster.

Best,
michael

Could you please help us with the recommendations of mm2 running with connect-mirror-maker.sh ?

Regards,
Mahesh

I think it might be the best to run this inside a kafka connect cluster
e.g as described here: