MM2 Stops Replicating on Corrupt Record – Need Guidance for Active/Active Setup

Hi Team,

We are running Apache MirrorMaker 2 (MM2) in active/active mode using connect-mirrormaker.sh on a 6-node Apache Kafka cluster (v3.8.1). The source cluster is running Kafka 2.8.0. Below is a summary of our setup and the issue we’re facing:

Setup

  • MM2 is running on the destination Kafka cluster (3.8.1).
  • We have bidirectional replication between clusters.
  • We are using a standard MM2 config with the following error-handling setup:
### Configure MM2 to manage corrupt records - start

# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.timeout = 600000

{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.timeout = 600000

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.delay.max.ms = 30000

{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.delay.max.ms = 30000


# log error context along with application logs, but do not include configs and messages

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.enable = true

{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.enable = true

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.include.messages = false

{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.include.messages = false


# produce error context into the Kafka topic

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-forward-deadletterqueue

{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-backward-deadletterqueue


# Tolerate all errors.

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.tolerance = all

{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.tolerance = all


### Configure MM2 to manage corrupt records - end

We also tried adding these:

errors.log.enable = true
errors.log.include.messages = true

Issue

Every 2 weeks or so, if MM2 encounters a corrupt record from the source, the affected node stops replicating data. This leads to increasing lag on certain topics and partitions. The destination cluster can no longer fetch data from that source node. Despite the error-handling configs above, replication remains stuck until we manually restart the MM2 process on all 6 nodes(restarting only affected node is not helping to restart replication).


FYI, here our full MM2 config

## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = {{ source_cluster_name }}-{{ dest_cluster_name }}

# Maximum number of tasks to use for this connector
tasks.max = 12
num.stream.threads = 6 

# Setting replication factor of newly created remote topics
replication.factor = 3

errors.log.enable = true
errors.log.include.messages = true

# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

# enable distributed mode in 3.8.1 kafka
dedicated.mode.enable.internal.rest = true

## Kafka clusters aliases
clusters = {{ source_cluster_name }}, {{ dest_cluster_name }}


# upstream cluster to replicate
{{ source_cluster_name }}.bootstrap.servers = {{ source_cluster_ips }}
 

# downstream cluster
{{ dest_cluster_name }}.bootstrap.servers = {{ dest_cluster_ips }}

# enable and configure individual replication flows
{{ source_cluster_name }}->{{ dest_cluster_name }}.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.enabled = true    

# whether or not to monitor source cluster for configuration changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topics.configs.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topics.configs.enabled = true

# Do not to monitor source cluster for ACLs changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topic.acls.enabled = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topic.acls.enabled = false

# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
{{ source_cluster_name }}->{{ dest_cluster_name }}.topics =  {{ src_to_gcp_topics_to_replicate }}
{{ dest_cluster_name }}->{{ source_cluster_name }}.topics =  {{ gcp_to_src_topics_to_replicate }}

# Configure from when the MM2 shuld start replicating data
{{ source_cluster_name }}->{{ dest_cluster_name }}.consumer.auto.offset.reset = latest
{{ dest_cluster_name }}->{{ source_cluster_name }}.consumer.auto.offset.reset = latest

# Sync consumer group offsets
{{ source_cluster_name }}->{{ dest_cluster_name }}.exclude.internal.topics = false
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.heartbeats.enabled = true
 
{{ dest_cluster_name }}->{{ source_cluster_name }}.exclude.internal.topics = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.heartbeats.enabled = true

# Enable automated consumer offset sync
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.group.offsets.enabled = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.checkpoints.enabled = true

{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.group.offsets.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.checkpoints.enabled = true

offset.flush.timeout.ms = 60000

### Configure MM2 to manage corrupt records - start
# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.timeout = 600000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.timeout = 600000

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.delay.max.ms = 30000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.delay.max.ms = 30000

# log error context along with application logs, but do not include configs and messages
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.enable = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.enable = true

{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.include.messages = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.include.messages = false

# produce error context into the Kafka topic
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-forward-deadletterqueue
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-backward-deadletterqueue

# Tolerate all errors.
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.tolerance = all
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.tolerance = all
### Configure MM2 to manage corrupt records - end

# Forward: src -> tgt
#https://kafka.apache.org/38/documentation/#georeplication:~:text=%7Bsource%7D.consumer.%7Bconsumer_config_name%7D
{{ source_cluster_name }}.consumer.max.poll.records = 20000
{{ source_cluster_name }}.consumer.receive.buffer.bytes = 33554432
{{ source_cluster_name }}.consumer.send.buffer.bytes = 33554432
{{ source_cluster_name }}.consumer.max.partition.fetch.bytes = 33554432

{{ source_cluster_name }}.producer.message.max.bytes = 37755000
{{ source_cluster_name }}.producer.compression.type = gzip
{{ source_cluster_name }}.producer.max.request.size = 26214400
{{ source_cluster_name }}.producer.buffer.memory = 524288000
{{ source_cluster_name }}.producer.batch.size = 524288

# Backward: tgt -> src
{{ dest_cluster_name }}.consumer.max.poll.records = 20000
{{ dest_cluster_name }}.consumer.receive.buffer.bytes = 33554432
{{ dest_cluster_name }}.consumer.send.buffer.bytes = 33554432
{{ dest_cluster_name }}.consumer.max.partition.fetch.bytes = 33554432

{{ dest_cluster_name }}.producer.message.max.bytes = 37755000
{{ dest_cluster_name }}.producer.compression.type = gzip
{{ dest_cluster_name }}.producer.max.request.size = 26214400
{{ dest_cluster_name }}.producer.buffer.memory = 524288000
{{ dest_cluster_name }}.producer.batch.size = 524288



# SASL Configurations

#only destination cluster has auth/auth enabled
{{ dest_cluster_name }}.security.protocol=SASL_PLAINTEXT
{{ dest_cluster_name }}.sasl.mechanism=SCRAM-SHA-256
{{ dest_cluster_name }}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ kafkaAdminUser }}" password="{{ kafka_admin_password }}";

Question

  • Is there a way to make MM2 skip corrupt records gracefully and continue replicating?
  • Are we missing any critical config to enable proper dead-letter handling or error resilience?
  • Has anyone faced a similar issue with corrupt records stalling replication in MM2 active/active setups?

This is causing a critical impact in our production setup, and we’d appreciate any guidance or best practices to handle such scenarios more gracefully.

Thanks in advance!

Hi @mmuehlbeyer,

i would appreciate your thought on this as we had a communication long time back on Mirrormaker2 creating heartbeats topics recursively in loop in active-active mode

Hi @Ismail

happy to be of help :slight_smile:

I assume there is no clear error message which corresponds to your error?

Best,
Michael

Hi @mmuehlbeyer

thank you for your reply, forgot to add log msg earlier.

here’s the same line of log which are being generated by 2 millions record/minute and our 500MB of log files if filled up in 30 seconds.

{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}
{"timestamp":"2025-05-21 11:32:21,821","level":"WARN","logger":"org.apache.kafka.connect.mirror.MirrorSourceTask","thread":"task-thread-MirrorSourceConnector-9","message":"Failure during poll.","stacktrace":"org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10"}

need to check in details later on

though did you check this one

there is a similar issue reported which is related to compressed messages
could this also be the same for you?

i think issue was RecordTooLargeException with this one

(Mirror Maker 2 connector trying to write bigger messages than origin - #3 by kuro)

and we are facing completely different exception CorruptRecordException

“message”:“Failure during poll.”,
“stacktrace”:“org.apache.kafka.common.errors.CorruptRecordException: Invalid magic found in record: 10”}

i am not sure if both issues are relatable or not?

just was thinking about a wrongly configured compression which may be
related to the magic byte error

yeah maybe, but AFAIK magic bytes is same for version 2.8.0 and 3.8.1 as it was changed with version 4.0

mmh I think so yes
was just an idea to how to tackle the issue by changing the setting mentioned in the post above