no worries thanks for the update
not sure what we could in addition to be honest, looks a bit strange
just to be sure:
the connect-distributed is running on kafka 3.8.1 right?
nothing running on the legacy 2.8.1?
no worries thanks for the update
not sure what we could in addition to be honest, looks a bit strange
just to be sure:
the connect-distributed is running on kafka 3.8.1 right?
nothing running on the legacy 2.8.1?
yes, connect-distributed is running on kafka 3.8.1
here’s properties
## Mirror Maker Configurations
# name of the connector, e.g. "us-west->us-east"
name = {{ source_cluster_name }}-{{ dest_cluster_name }}
# Maximum number of tasks to use for this connector
tasks.max = 12
num.stream.threads = 6
# Setting replication factor of newly created remote topics
replication.factor = 3
errors.log.enable = true
errors.log.include.messages = true
# use ByteArrayConverter to ensure that records are not re-encoded and stay the same
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
# enable distributed mode in 3.8.1 kafka
dedicated.mode.enable.internal.rest = true
## Kafka clusters aliases
clusters = {{ source_cluster_name }}, {{ dest_cluster_name }}
# upstream cluster to replicate
{{ source_cluster_name }}.bootstrap.servers = {{ source_cluster_ips }}
# downstream cluster
{{ dest_cluster_name }}.bootstrap.servers = {{ dest_cluster_ips }}
# enable and configure individual replication flows
{{ source_cluster_name }}->{{ dest_cluster_name }}.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.enabled = true
# whether or not to monitor source cluster for configuration changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topics.configs.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topics.configs.enabled = true
# Do not to monitor source cluster for ACLs changes
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.topic.acls.enabled = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.topic.acls.enabled = false
# regex of topics to replicate, e.g. "topic1|topic2|topic3". Comma-separated lists are also supported.
{{ source_cluster_name }}->{{ dest_cluster_name }}.topics = {{ src_to_gcp_topics_to_replicate }}
{{ dest_cluster_name }}->{{ source_cluster_name }}.topics = {{ gcp_to_src_topics_to_replicate }}
# Configure from when the MM2 shuld start replicating data
{{ source_cluster_name }}->{{ dest_cluster_name }}.consumer.auto.offset.reset = latest
{{ dest_cluster_name }}->{{ source_cluster_name }}.consumer.auto.offset.reset = latest
# Sync consumer group offsets
{{ source_cluster_name }}->{{ dest_cluster_name }}.exclude.internal.topics = false
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.heartbeats.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.exclude.internal.topics = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.heartbeats.enabled = true
# Enable automated consumer offset sync
{{ source_cluster_name }}->{{ dest_cluster_name }}.sync.group.offsets.enabled = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.emit.checkpoints.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.sync.group.offsets.enabled = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.emit.checkpoints.enabled = true
offset.flush.timeout.ms = 60000
### Configure MM2 to manage corrupt records - start
# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.timeout = 600000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.timeout = 600000
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.retry.delay.max.ms = 30000
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.retry.delay.max.ms = 30000
# log error context along with application logs, but do not include configs and messages
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.enable = true
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.enable = true
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.log.include.messages = false
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.log.include.messages = false
# produce error context into the Kafka topic
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-forward-deadletterqueue
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.deadletterqueue.topic.name = dbeng-mm2-backward-deadletterqueue
# Tolerate all errors.
{{ source_cluster_name }}->{{ dest_cluster_name }}.errors.tolerance = all
{{ dest_cluster_name }}->{{ source_cluster_name }}.errors.tolerance = all
### Configure MM2 to manage corrupt records - end
# Forward: src -> tgt
#https://kafka.apache.org/38/documentation/#georeplication:~:text=%7Bsource%7D.consumer.%7Bconsumer_config_name%7D
{{ source_cluster_name }}.consumer.max.poll.records = 20000
{{ source_cluster_name }}.consumer.receive.buffer.bytes = 33554432
{{ source_cluster_name }}.consumer.send.buffer.bytes = 33554432
{{ source_cluster_name }}.consumer.max.partition.fetch.bytes = 33554432
{{ source_cluster_name }}.producer.message.max.bytes = 37755000
{{ source_cluster_name }}.producer.compression.type = gzip
{{ source_cluster_name }}.producer.max.request.size = 26214400
{{ source_cluster_name }}.producer.buffer.memory = 524288000
{{ source_cluster_name }}.producer.batch.size = 524288
# Backward: tgt -> src
{{ dest_cluster_name }}.consumer.max.poll.records = 20000
{{ dest_cluster_name }}.consumer.receive.buffer.bytes = 33554432
{{ dest_cluster_name }}.consumer.send.buffer.bytes = 33554432
{{ dest_cluster_name }}.consumer.max.partition.fetch.bytes = 33554432
{{ dest_cluster_name }}.producer.message.max.bytes = 37755000
{{ dest_cluster_name }}.producer.compression.type = gzip
{{ dest_cluster_name }}.producer.max.request.size = 26214400
{{ dest_cluster_name }}.producer.buffer.memory = 524288000
{{ dest_cluster_name }}.producer.batch.size = 524288
# SASL Configurations
#only destination GCP cluster has auth/auth enabled
{{ dest_cluster_name }}.security.protocol=SASL_PLAINTEXT
{{ dest_cluster_name }}.sasl.mechanism=SCRAM-SHA-256
{{ dest_cluster_name }}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ kafkaAdminUser }}" password="{{ kafka_admin_password }}";
so if my understanding is correct it’s still blocking you , right?
or is is just an error without harming your setup?
did you consider to raise an issue here?
your kafka connect setup is fine?
Yes, it’s just an error with no impact on our setup — there’s no lag observed on the replicated topics. We can raise a JIRA and track it further once we recheck our firewall policies for port 8083 used in internode communication.
Regarding your question on the Kafka Connect setup:
Yes, it’s working as expected — we’re running the connect-mirror-maker.sh process across all 6 nodes.
Also, there’s one more issue that got sidelined — we occasionally face a corrupt record exception, which disrupts replication from the affected node until we restart all MirrorMaker processes at once. This typically happens once or twice a month.
You can refer to the main thread above for context. We’d appreciate your help in resolving this as well.
Thanks again!
understood
though you mentioned the connect distributed as well, right? where is this located?
remember this though I have no good explanation for this till now, only that it might be related to an issue with compression or so
best
@mmuehlbeyer i was on vacation, sorry for the delayed response
we’re running the connect-mirror-maker.sh process, not the connect distributed.
no worries.
are you able to test the connect-mirror-maker one?
Yes, it’s working as expected — we’re running the connect-mirror-maker.sh process across all 6 nodes. But we occasionally face a corrupt record exception, which disrupts replication from the affected node until we restart all MirrorMaker processes at once. This typically happens once or twice a month.
You can refer to the main thread above for context. We’d appreciate your help in resolving this as well.
Here’s the MM2.service file which is running parallely across 6 nodes
[Unit]
Description=MirrorMaker2
ConditionPathExists={{ mirrorMakerTwoDir }}/mm2.properties
After=network.target
[Service]
Type=simple
#User=kafka
#SyslogIdentifier=kafka-mm2
EnvironmentFile={{ mirrorMakerTwoDir }}/heap_env_file.txt
Environment="KAFKA_JMX_OPTS=-javaagent:{{ kafkaLibDir }}/jmx_prometheus_javaagent-0.17.0.jar=9191:{{ kafka_connect_file }}"
ExecStart={{ kafkaBinDir }}/connect-mirror-maker.sh {{ mirrorMakerTwoDir }}/mm2.properties
ExecStop=/usr/bin/pkill {{ kafkaBinDir }}/connect-mirror-maker.sh
#SuccessExitStatus=143
#Restart=always
#RestartSec=5
#StartLimitInterval=0
TimeoutSec=600s
[Install]
WantedBy=multi-user.target
sorry misspelled my question above
would like to if you have the chance to test with connect-mirror-distributed ![]()
ah okay, we tried that too in dev environment but it was not working as expected as it was creating prefix topics and not replicating data.Here’s the config we used
MirrorMakerReplication:
{
"name": "MirrorMakerReplication",
"consumer.group.id": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"clusters": "qa,dev",
"source.cluster.alias": "qa",
"target.cluster.alias": "dev",
"replication.policy.separator": ".",
"sync.topic.configs.enabled": "true",
"topics": "testing,,rabbit,abc",
"groups": ".*",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"refresh.topics.enabled": "true",
"sync.topic.acls.enabled": "true",
"refresh.groups.interval.seconds": "15",
"refresh.topics.interval.seconds": "15",
"tasks.max": "2",
"replication.factor": "2",
"offset-syncs.topic.replication.factor": "2",
"sync.topic.acls.interval.seconds": "20",
"sync.topic.configs.interval.seconds": "20",
"source.cluster.bootstrap.servers": "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"abc\";",
"source.cluster.consumer.offset.flush.timeout.ms": "20000",
"source.cluster.consumer.max.partition.fetch.bytes": "419431424",
"target.cluster.bootstrap.servers": "y.y.y.y:9092,y.y.y.y:9092,y.y.y.y:9092",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"abc\";",
"target.cluster.producer.offset.flush.timeout.ms": "20000",
"target.cluster.producer.offset.flush.interval.ms": "10000",
"target.cluster.producer.buffer.memory": "33554432",
"target.cluster.producer.max.request.size": "419431424",
"target.cluster.max.request.size": "419431424",
"producer.max.request.size": "419431424",
"max.request.size": "419431424",
"target.cluster.producer.segment.index.bytes": "419431424",
"target.cluster.producer.socket.request.max.bytes": "419431424",
"target.cluster.producer.max.message.bytes": "419431424",
"target.cluster.producer.message.max.bytes": "419431424",
"replica.fetch.max.bytes": "419431424",
"target.cluster.producer.compression.type": "snappy",
"target.cluster.producer.cleanup.policy": "compact",
"offset.flush.timeout.ms": "20000",
"offset.flush.interval.ms": "10000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "mm2-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "2",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
MirrorMakerCheckpoint:
{
"name": "MirrorMakerCheckpoint",
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"clusters": "qa,dev",
"source.cluster.alias": "qa",
"target.cluster.alias": "dev",
"replication.factor": "2",
"tasks.max": "2",
"emit.checkpoints.interval.seconds": "10",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.separator": ".",
"checkpoints.topic.replication.factor": "2",
"sync.group.offsets.enabled": "true",
"source.cluster.bootstrap.servers": "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"abc\";",
"source.cluster.offset.flush.timeout.ms": "20000",
"source.cluster.consumer.max.partition.fetch.bytes": "419431424",
"target.cluster.bootstrap.servers": "y.y.y.y:9092,y.y.y.y:9092,y.y.y.y:9092",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"abc\";",
"target.cluster.offset.flush.timeout.ms": "20000",
"target.cluster.producer.buffer.memory": "33554432",
"target.cluster.producer.max.request.size": "419431424",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "mm2-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "2",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
MirrorMakerHeartbeat:
{
"name": "MirrorMakerHeartbeat",
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"clusters": "qa,dev",
"source.cluster.alias": "qa",
"target.cluster.alias": "dev",
"replication.policy.separator": ".",
"tasks.max": "2",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.factor": "3",
"heartbeats.topic.replication.factor": "3",
"emit.heartbeats.interval.seconds": "20",
"source.cluster.bootstrap.servers": "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"abc\";",
"source.cluster.offset.flush.timeout.ms": "20000",
"source.cluster.consumer.max.partition.fetch.bytes": "419431424",
"target.cluster.bootstrap.servers": "y.y.y.y:9092,y.y.y.y:9092,y.y.y.y:9092",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"abc\";",
"target.cluster.offset.flush.timeout.ms": "20000",
"target.cluster.producer.buffer.memory": "33554432",
"target.cluster.producer.max.request.size": "419431424",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "mm2-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "2",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
the above connect setup was for testing purpose but, tbh we want to proceed with connect-mirror-maker.sh process as its already running in production since long now
ok I see
let me check your provided config in details
might take some time, will come back to you
Thank you!!! Appreciate your assistance
was not able to reproduce the issue yet
something which came to my mind regarding the skipping of corrupt records.
in general there is a possibility to create your own replication and use this class within mirrormaker
something like this
would this be a possible workaround ?
Thanks for sharing, but i am not sure how to implement this as i haven’t created like this before ![]()
I see ![]()
so the obvious question: upgrade on any of the clusters possible to the latest kafka version(looking at the 2.x kafka cluster)
unfortunately we cannot upgrade actually as we moves to cloud, there’s no plan to upgrade on-prem clusters
I see
one other option I could think off (not sure if I already mentioned)
exclude the topics which causing the issues and transfer the data on another way:
Yeah, that works, thanks for the suggestions. The only concern is that if we create an application to consume and produce as an alternative, we won’t be able to replicate consumer offsets to the destination, since MM2 handles that replication. On the other hand, if we go with Kafka Connect, it might be too much overhead to manage just for one or two topics as we performed POC last month in dev environment and it wasn’t functioning as expected as described above.
This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.