We are experiencing an issue with Apache Kafka MirrorMaker 2 (MM2) where it is creating heartbeat topics in a recursive manner. This results of heartbeat topics, which is not the intended behavior. This issue specifically arises when we enable reverse replication; one-way replication works fine.
Issue Observed:
- This recursive creation of heartbeat topics is leading to an excessive number of topics being generated, which is not manageable.
- While replicating from a source with version 2.8.0 to a destination with version 3.4.1, everything works fine. However, when performing reverse replication from a destination with version 3.4.1 to a source with version 2.8.0, it creates heartbeat topics recursively.
Environment Details:
-
Source Apache Kafka Version: 2.8.0
-
Destination Apache Kafka Version: 3.4.1
MM2 properties being used on source and destination cluster:
## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = {{ source_cluster_name }}-{{ dest_cluster_name }}
# Maximum number of tasks to use for this connector
tasks.max = 4
# Setting replication factor of newly created remote topics
replication.factor = 3
errors.log.enable = true
errors.log.include.messages = true
# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
## Kafka clusters aliases
clusters = {{ source_cluster_name }}, {{ dest_cluster_name }}
# upstream cluster to replicate
{{ source_cluster_name }}.bootstrap.servers = {{ source_cluster_ips }}
# downstream cluster
{{ dest_cluster_name }}.bootstrap.servers = {{ dest_cluster_ips }}
# enable and configure individual replication flows
{{ source_cluster_name }}->{{ dest_cluster_name }}.enabled = true
# whether or not to monitor source cluster for configuration changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topics.configs.enabled = true
# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
{{ source_cluster_name }}->{{ dest_cluster_name }}.topics = {{ topics_to_replicate }}
# Configure from when the MM2 shuld start replicating data
{{ source_cluster_name }}->{{ dest_cluster_name }}.consumer.auto.offset.reset = latest
# Sync consumer group offsets
{{ source_cluster_name }}->{{ dest_cluster_name }}.exclude.internal.topics = false
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.heartbeats.enabled = true
# Enable automated consumer offset sync
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.group.offsets.enabled = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000
We appreciate your assistance in resolving this critical issue.