Mirror Maker 2 connector trying to write bigger messages than origin

I’m trying to replicate data between two Kafka Clusters using Mirror Maker 2 with Kafka Connect. The connector is working fine with all the topics I’ve tried excepting one, throwing the following error in all tasks:

“org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:290)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:351)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.\n”

Before getting this error, I had to increase the producer.max.request.size of my Kafka Connect cluster from default to producer.max.request.size=15000000.

The source topic configuration for for max.message.bytes is 1048588, and this configuration is exactly the same in the target topic. If I manually edit the target topic to a bigger max.message.bytes=15000000, then the replication works fine.

I’m trying to understand how is this behaviour possible and how the Mirror Maker wants to write a bigger message in the target topic than the source message. The first thing that came to my mind was the compression.type of the source topic, so I’ve tried to force the target to use gzip compression, but it doesn’t worked.

Do you have any ideas about this?

Here are the worker and connector configurations:

Worker:

security.protocol=SASL_SSL
bootstrap.servers=BOOTSTRAP_SERVERS
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="SASL_JAAS_CONFIG_USERNAME_SBX" password="SASL_JAAS_CONFIG_PASSWORD_SBX";
sasl.mechanism=SCRAM-SHA-512
rest.advertised.host.name=CONNECT_REST_ADVERTISED_HOST_NAME
rest.port=8083
group.id=kafka_connect_mm
config.storage.topic=_connect_mm_configs
offset.storage.topic=_connect_mm_offsets
status.storage.topic=_connect_mm_status
config.storage.replication.factor=STORAGE_REPLICATION_FACTOR
offset.storage.replication.factor=STORAGE_REPLICATION_FACTOR
status.storage.replication.factor=STORAGE_REPLICATION_FACTOR
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
plugin.path=/usr/share/java/,/usr/share/confluent-hub-components/

consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="******" password="******";
consumer.sasl.mechanism=SCRAM-SHA-512

producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="******"password="*******";
producer.sasl.mechanism=SCRAM-SHA-512
producer.max.request.size=15000000

Connector:

"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"topic.creation.default.partitions": "10",
"target.cluster.sasl.jaas.config": "****** ",
"tasks.max": "24",
"source.cluster.alias": "prod",
"target.cluster.security.protocol": "SASL_SSL",
"status.storage.replication.factor": "1",
"_topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"replication.policy.separator": "_mm_",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"clusters": "sandbox, prod",
"topic.creation.default.compression.type": "gzip",
"topics": "topic",
"target.cluster.sasl.mechanism": "SCRAM-SHA-512",
"source.cluster.sasl.jaas.config": "******* ",
"source.cluster.bootstrap.servers": "*******",
"source.cluster.sasl.mechanism": "SCRAM-SHA-512",
"config.storage.replication.factor": "1",
"target.cluster.alias": "sandbox",
"name": "mirror-maker",
"target.cluster.bootstrap.servers": "******",
"offset.storage.replication.factor": "1",
"_replication.factor": "1",
"source.cluster.security.protocol": "SASL_SSL",
"_topic.creation.default.cleanup.policy": "delete"

Thanks a lot in advance!
Brais

Hi Brais,
try to set producer.compression.type for the worker. Mirrormaker does not inherit the compression type of the source message.
If you set compression.type only at the topic level, the compression is done by the broker. But if the producer sends uncompressed messages that exceed the max message size, the broker will reject it.
Best regards
Daniel

1 Like

Hi Daniel,

You are absolutely right!

I was configuring the compression at topic level, and the broker detected a bigger message that it can handle before compressing, hence the error I shown. I was thinking that if the compressed message fits in the topic, the broker must accept it but that doesn’t seem to be the case.

Setting the compression in the producer of the worker solved the problem. To make this property easier to set only by the topics that need compression, I allowed it to be overriden by the connectors:

Worker config:

connector.client.config.override.policy=All

Connector config:

“producer.override.compression.type”: “gzip”
“topic.creation.default.compression.type”: “producer”

It’s a shame that Mirror Maker is not able to identify source compression on the fly :cry: .

Thanks a lot for your help and have a great day!
Brais

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.