Hi Team,
We are running Apache MirrorMaker 2 (MM2) in active/active mode using connect-mirrormaker.sh
on a 6-node Apache Kafka cluster (v3.8.1). The source cluster is running Kafka 2.8.0. Below is a summary of our setup and the issue we’re facing:
Setup
- MM2 is running on the destination Kafka cluster (3.8.1).
- We have bidirectional replication between clusters.
- We are using a standard MM2 config with the following error-handling setup:
### Configure MM2 to manage corrupt records - start
# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.timeout = 600000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.timeout = 600000
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.delay.max.ms = 30000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.delay.max.ms = 30000
# log error context along with application logs, but do not include configs and messages
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.enable = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.enable = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.include.messages = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.include.messages = false
# produce error context into the Kafka topic
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-forward-deadletterqueue
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-backward-deadletterqueue
# Tolerate all errors.
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.tolerance = all
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.tolerance = all
### Configure MM2 to manage corrupt records - end
We also tried adding these:
errors.log.enable = true
errors.log.include.messages = true
Issue
Every 2 weeks or so, if MM2 encounters a corrupt record from the source, the affected node stops replicating data. This leads to increasing lag on certain topics and partitions. The destination cluster can no longer fetch data from that source node. Despite the error-handling configs above, replication remains stuck until we manually restart the MM2 process on all 6 nodes(restarting only affected node is not helping to restart replication).
FYI, here our full MM2 config
## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = {{ source_cluster_name }}-{{ dest_cluster_name }}
# Maximum number of tasks to use for this connector
tasks.max = 12
num.stream.threads = 6
# Setting replication factor of newly created remote topics
replication.factor = 3
errors.log.enable = true
errors.log.include.messages = true
# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
# enable distributed mode in 3.8.1 kafka
dedicated.mode.enable.internal.rest = true
## Kafka clusters aliases
clusters = {{ source_cluster_name }}, {{ dest_cluster_name }}
# upstream cluster to replicate
{{ source_cluster_name }}.bootstrap.servers = {{ source_cluster_ips }}
# downstream cluster
{{ dest_cluster_name }}.bootstrap.servers = {{ dest_cluster_ips }}
# enable and configure individual replication flows
{{ source_cluster_name }}->{{ dest_cluster_name }}.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.enabled = true
# whether or not to monitor source cluster for configuration changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topics.configs.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topics.configs.enabled = true
# Do not to monitor source cluster for ACLs changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topic.acls.enabled = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topic.acls.enabled = false
# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
{{ source_cluster_name }}->{{ dest_cluster_name }}.topics = {{ src_to_gcp_topics_to_replicate }}
{{ dest_cluster_name }}->{{ source_cluster_name }}.topics = {{ gcp_to_src_topics_to_replicate }}
# Configure from when the MM2 shuld start replicating data
{{ source_cluster_name }}->{{ dest_cluster_name }}.consumer.auto.offset.reset = latest
{{ dest_cluster_name }}->{{ source_cluster_name }}.consumer.auto.offset.reset = latest
# Sync consumer group offsets
{{ source_cluster_name }}->{{ dest_cluster_name }}.exclude.internal.topics = false
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.heartbeats.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.exclude.internal.topics = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.heartbeats.enabled = true
# Enable automated consumer offset sync
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.group.offsets.enabled = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.checkpoints.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.group.offsets.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000
### Configure MM2 to manage corrupt records - start
# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.timeout = 600000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.timeout = 600000
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.delay.max.ms = 30000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.delay.max.ms = 30000
# log error context along with application logs, but do not include configs and messages
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.enable = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.enable = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.include.messages = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.include.messages = false
# produce error context into the Kafka topic
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-forward-deadletterqueue
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-backward-deadletterqueue
# Tolerate all errors.
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.tolerance = all
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.tolerance = all
### Configure MM2 to manage corrupt records - end
# Forward: src -> tgt
#https://kafka.apache.org/38/documentation/#georeplication:~:text=%7Bsource%7D.consumer.%7Bconsumer_config_name%7D
{{ source_cluster_name }}.consumer.max.poll.records = 20000
{{ source_cluster_name }}.consumer.receive.buffer.bytes = 33554432
{{ source_cluster_name }}.consumer.send.buffer.bytes = 33554432
{{ source_cluster_name }}.consumer.max.partition.fetch.bytes = 33554432
{{ source_cluster_name }}.producer.message.max.bytes = 37755000
{{ source_cluster_name }}.producer.compression.type = gzip
{{ source_cluster_name }}.producer.max.request.size = 26214400
{{ source_cluster_name }}.producer.buffer.memory = 524288000
{{ source_cluster_name }}.producer.batch.size = 524288
# Backward: tgt -> src
{{ dest_cluster_name }}.consumer.max.poll.records = 20000
{{ dest_cluster_name }}.consumer.receive.buffer.bytes = 33554432
{{ dest_cluster_name }}.consumer.send.buffer.bytes = 33554432
{{ dest_cluster_name }}.consumer.max.partition.fetch.bytes = 33554432
{{ dest_cluster_name }}.producer.message.max.bytes = 37755000
{{ dest_cluster_name }}.producer.compression.type = gzip
{{ dest_cluster_name }}.producer.max.request.size = 26214400
{{ dest_cluster_name }}.producer.buffer.memory = 524288000
{{ dest_cluster_name }}.producer.batch.size = 524288
# SASL Configurations
#only destination cluster has auth/auth enabled
{{ dest_cluster_name }}.security.protocol=SASL_PLAINTEXT
{{ dest_cluster_name }}.sasl.mechanism=SCRAM-SHA-256
{{ dest_cluster_name }}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ kafkaAdminUser }}" password="{{ kafka_admin_password }}";
Question
- Is there a way to make MM2 skip corrupt records gracefully and continue replicating?
- Are we missing any critical config to enable proper dead-letter handling or error resilience?
- Has anyone faced a similar issue with corrupt records stalling replication in MM2 active/active setups?
This is causing a critical impact in our production setup, and we’d appreciate any guidance or best practices to handle such scenarios more gracefully.
Thanks in advance!