Where should i change max.request.size in confluent

Hi All,

I am working on creating a mysql CDC pipeline using debezium connector and the sink connects to HDFS using hdfs-sink connector, everythinh was fine untill my source connector failed due to record too large exception, now i know i need to change the max.request.size to a value more than that, but i am not sure where do i do that, i have made the changes in distributed.properties but it did not work for me, please help me where does confluent pick the producer config.

curl -H "Accept:application/json" x.x.x.x:8083/connectors/exact-test/status

{“name”:“exact-test”,“connector”:{“state”:“RUNNING”,“worker_id”:“x.x.x.x.:8083”},“tasks”:[{“id”:0,“state”:“FAILED”,“worker_id”:“x.x.x.x:8083”,“trace”:"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:282)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1522947 bytes when serialized which is larger than 1048576,

Can anyone help me in this?

To override producer values in Kafka Connect use the producer. prefix in the Kafka Connect worker config (i.e. producer.max.request.size). You can also override it per connector.

Keep in mind that if you change the producer max.request.size setting, you will need to change the corresponding topic max.message.bytes setting. You may also need to change the consumer fetch.max.bytes and max.partition.fetch.bytes settings. Here are links to the doc for these settings.

max.message.bytes

fetch.max.bytes

max.partition.fetch.bytes

1 Like

Hi,
Thanks a lot for your reply but unfortunately it is not working for me.
I have tried the below settings across.

producer.properties : max.request.size = 1522947 replica.fetch.max.bytes = 1522947
consumer.properties : max.partition.fetch.bytes=1522947 fetch.message.max.bytes = 1522947
server.properties : message.max.bytes=1522947 replica.fetch.max.bytes=4194304
connect-standalone.properties : producer.max.request.size=1522947 , consumer.max.partition.fetch.bytes=1522947 and consumer.fetch.message.max.bytes = 1522947
connect-distributed.properties : producer.max.request.size=1522947 , consumer.max.partition.fetch.bytes=1522947 and consumer.fetch.message.max.bytes = 1522947

Any idea what i am doing wrong?

Regards,
Ayan

Hi,

Since i was not able to change the values i thought of overriding the values per connector base, and hence i face below issue.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" datanode26-htrunk:8083/connectors/ -d '{ "name": "exact-test",  "config": {  "connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1",  "database.hostname": "x.x.x.x",  "database.port": "3306","database.user": "cdc_poc","database.password": "xxxxx", "database.server.id": "184055",  "database.server.name": "EXACT",  "database.include.list": "exact_test",  "database.history.kafka.bootstrap.servers": "xxxx:9092",  "database.history.kafka.topic": "schema-changes.exact_test" , "producer.override.max.request.size": "15728640"  }}'
HTTP/1.1 400 Bad Request
Date: Wed, 07 Apr 2021 15:03:48 GMT
Content-Type: application/json
Content-Length: 319
Server: Jetty(9.4.24.v20191120)

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nThe 'None' policy does not allow 'max.request.size' to be overridden in the connector configuration.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

Please anyone can tell me which file should i change the below parameters.
connector.client.config.override.policy=All

Regards,
Ayan

You need to set connector.client.config.override.policy=all in the worker config. See Connector-level producer/consumer configuration overrides

I am extremely sorry about this, but I am aware about the property but I am still not able to understand which file is the worker file here.

Here I am registering the my sql source connector directly using a curl command and hence not using a file to do it.

Here under $CONFLUENT_HOME/etc/kafka I see couple of files is it any one of these?
#producer.properties
#server.properties
#consumer.properties
#connect-distributed.properties
#connect-standalone.properties

Even I have tried adding the property in all the three files (producer, server and consumer) files

Regards,
Ayan

Hi,

Let me know if i have registered my connector using REST, which file should i consider for changing the parameters.

Regards,
Ayan

Hi Ayan,

To learn more about configuring connect workers, you can review this related documentation:

https://docs.confluent.io/home/connect/userguide.html#configuring-and-running-workers

The standard Confluent Platform distribution includes two sample connect worker configuration files for a connect distributed deplyment:

/etc/schema-registry/connect-avro-distributed.properties
/etc/kafka/connect-distributed.properties

I hope this helps.

Dave