We have 2 clusters one in On_prem (10 brokers) and another on GCP (6 brokers). We have setup mm2 on GCP kafka cluster to replicate data to & from On_prem.
Replication from GCP to On_prem:
We see replication is running on 5 nodes out of 6. We are seeing an error as below on one node.
[2025-02-03 11:26:54,816] ERROR org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker clientId=connect-1, groupId=gcp-mm2] Failed to reconfigure connector's tasks (MirrorCheckpointConnector), retrying after backoff:
org.apache.kafka.connect.runtime.distributed.NotLeaderException: This worker is not able to communicate with the leader of the cluster, which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a distributed Kafka Connect cluster.
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1933)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1851)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$38(DistributedHerder.java:1864)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2051)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:451)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:352)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2025-02-03 11:26:54,816] DEBUG org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker clientId=connect-1, groupId=gcp-mm2] Ensuring group membership is still active
[2025-02-03 11:26:54,816] DEBUG org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker clientId=connect-1, groupId=gcp-mm2] Ensuring group membership is still active
We have tested the connectivity between the clusters and everything seems fine. We are uncertain if the replication of all topics will be handled by these five nodes, or if we are missing events to replicate it to the destination because of this one node.
Replication from On_prem to GCP:
We see replication is running on all the 6 nodes.
Below are the mm2.properties we set,
## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = {{ source_cluster_name }}-{{ dest_cluster_name }}
# Maximum number of tasks to use for this connector
tasks.max = 12
num.stream.threads = 6
# Setting replication factor of newly created remote topics
replication.factor = 3
errors.log.enable = true
errors.log.include.messages = true
# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
## Kafka clusters aliases
clusters = {{ source_cluster_name }}, {{ dest_cluster_name }}
# upstream cluster to replicate
{{ source_cluster_name }}.bootstrap.servers = {{ source_cluster_ips }}
# downstream cluster
{{ dest_cluster_name }}.bootstrap.servers = {{ dest_cluster_ips }}
# enable and configure individual replication flows
{{ source_cluster_name }}->{{ dest_cluster_name }}.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.enabled = true
# whether or not to monitor source cluster for configuration changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topics.configs.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topics.configs.enabled = true
# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
{{ source_cluster_name }}->{{ dest_cluster_name }}.topics = {{ src_to_gcp_topics_to_replicate }}
{{ dest_cluster_name }}->{{ source_cluster_name }}.topics = {{ gcp_to_src_topics_to_replicate }}
# Configure from when the MM2 shuld start replicating data
{{ source_cluster_name }}->{{ dest_cluster_name }}.consumer.auto.offset.reset = earliest
{{ dest_cluster_name }}->{{ source_cluster_name }}.consumer.auto.offset.reset = earliest
# Sync consumer group offsets
{{ source_cluster_name }}->{{ dest_cluster_name }}.exclude.internal.topics = false
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.heartbeats.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.exclude.internal.topics = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.heartbeats.enabled = true
# Enable automated consumer offset sync
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.group.offsets.enabled = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.checkpoints.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.group.offsets.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000
# {{ source_cluster_name }} cluster overrides for {{ source_cluster_name }}->{{ dest_cluster_name }} replication
{{ source_cluster_name }}.max.poll.records = 20000
{{ source_cluster_name }}.receive.buffer.bytes = 33554432
{{ source_cluster_name }}.send.buffer.bytes = 33554432
{{ source_cluster_name }}.max.partition.fetch.bytes = 33554432
{{ source_cluster_name }}.message.max.bytes = 37755000
{{ source_cluster_name }}.compression.type = gzip
{{ source_cluster_name }}.max.request.size = 26214400
{{ source_cluster_name }}.buffer.memory = 524288000
{{ source_cluster_name }}.batch.size = 524288
# {{ dest_cluster_name }} cluster overrides for {{ dest_cluster_name }}->{{ source_cluster_name }} replication
{{ dest_cluster_name }}.max.poll.records = 20000
{{ dest_cluster_name }}.receive.buffer.bytes = 33554432
{{ dest_cluster_name }}.send.buffer.bytes = 33554432
{{ dest_cluster_name }}.max.partition.fetch.bytes = 33554432
{{ dest_cluster_name }}.message.max.bytes = 37755000
{{ dest_cluster_name }}.compression.type = gzip
{{ dest_cluster_name }}.max.request.size = 26214400
{{ dest_cluster_name }}.buffer.memory = 524288000
{{ dest_cluster_name }}.batch.size = 524288
# SASL Configurations
{{ dest_cluster_name }}.security.protocol=SASL_PLAINTEXT
{{ dest_cluster_name }}.sasl.mechanism=SCRAM-SHA-256
{{ dest_cluster_name }}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ user }}" password="{{ password }}";
Please let us know if we are missing anything on this. Thanks in advance!!!