Where should i change max.request.size in confluent

Hi All,

I am working on creating a mysql CDC pipeline using debezium connector and the sink connects to HDFS using hdfs-sink connector, everything was fine until my source connector failed due to record too large exception, now i know i need to change the max.request.size to a value more than that, but i am not sure where do i do that, i have made the changes in distributed.properties but it did not work for me, please help me where does confluent pick the producer config.

curl -H "Accept:application/json" x.x.x.x:8083/connectors/exact-test/status
{"name":"exact-test","connector":{"state":"RUNNING","worker_id":"x.x.x.x.:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"x.x.x.x:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
	at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:282)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1522947 bytes when serialized which is larger than 1048576,

Can anyone help me in this?

To override producer values in Kafka Connect use the producer. prefix in the Kafka Connect worker config (i.e. producer.max.request.size). You can also override it per connector.

Keep in mind that if you change the producer max.request.size setting, you will need to change the corresponding topic max.message.bytes setting. You may also need to change the consumer fetch.max.bytes and max.partition.fetch.bytes settings. Here are links to the doc for these settings.

max.message.bytes

fetch.max.bytes

max.partition.fetch.bytes

1 Like

Hi,
Thanks a lot for your reply but unfortunately it is not working for me.
I have tried the below settings across.

producer.properties : max.request.size = 1522947 replica.fetch.max.bytes = 1522947
consumer.properties : max.partition.fetch.bytes=1522947 fetch.message.max.bytes = 1522947
server.properties : message.max.bytes=1522947 replica.fetch.max.bytes=4194304
connect-standalone.properties : producer.max.request.size=1522947 , consumer.max.partition.fetch.bytes=1522947 and consumer.fetch.message.max.bytes = 1522947
connect-distributed.properties : producer.max.request.size=1522947 , consumer.max.partition.fetch.bytes=1522947 and consumer.fetch.message.max.bytes = 1522947

Any idea what i am doing wrong?

Regards,
Ayan

Hi,

Since i was not able to change the values i thought of overriding the values per connector base, and hence i face below issue.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" datanode26-htrunk:8083/connectors/ -d '{ "name": "exact-test",  "config": {  "connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1",  "database.hostname": "x.x.x.x",  "database.port": "3306","database.user": "cdc_poc","database.password": "xxxxx", "database.server.id": "184055",  "database.server.name": "EXACT",  "database.include.list": "exact_test",  "database.history.kafka.bootstrap.servers": "xxxx:9092",  "database.history.kafka.topic": "schema-changes.exact_test" , "producer.override.max.request.size": "15728640"  }}'
HTTP/1.1 400 Bad Request
Date: Wed, 07 Apr 2021 15:03:48 GMT
Content-Type: application/json
Content-Length: 319
Server: Jetty(9.4.24.v20191120)

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nThe 'None' policy does not allow 'max.request.size' to be overridden in the connector configuration.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

Please anyone can tell me which file should i change the below parameters.
connector.client.config.override.policy=All

Regards,
Ayan

You need to set connector.client.config.override.policy=all in the worker config. See Connector-level producer/consumer configuration overrides

I am extremely sorry about this, but I am aware about the property but I am still not able to understand which file is the worker file here.

Here I am registering the my sql source connector directly using a curl command and hence not using a file to do it.

Here under $CONFLUENT_HOME/etc/kafka I see couple of files is it any one of these?
#producer.properties
#server.properties
#consumer.properties
#connect-distributed.properties
#connect-standalone.properties

Even I have tried adding the property in all the three files (producer, server and consumer) files

Regards,
Ayan

Hi,

Let me know if i have registered my connector using REST, which file should i consider for changing the parameters.

Regards,
Ayan

Hi Ayan,

To learn more about configuring connect workers, you can review this related documentation:

https://docs.confluent.io/home/connect/userguide.html#configuring-and-running-workers

The standard Confluent Platform distribution includes two sample connect worker configuration files for a connect distributed deplyment:

/etc/schema-registry/connect-avro-distributed.properties
/etc/kafka/connect-distributed.properties

I hope this helps.

Dave

Hi there,

Just reading through this and was wondering if you ever got to the bottom of it?

I’m currently having a similar issue, whereby I want to increase the max request size to allow bigger batches through from Debezium to fix a bottleneck in out system. I’ve updated my settings as described above and I’ve set the cluster and connector settings but the metric kafka_connect_connect_metrics_request_size_max doesn’t appear to increase. To test if I get the same value before the connector is applied I tried removing it and still got the same value for this metric which is 99 but I’m not sure what that really means, is it measured in KB or MB?

We’re running kafka connect in kubernetes using the docker image confluentinc/cp-kafka-connect:6.2.1. When I delete the pods and they are recreated the metric above starts at an increased size and then caps off at the same value every time. I can’t find the source code that shows how this metric is set so I’m unsure what might be causing it but in my attempts to fix the issue I’ve changed the following:
Kafka Connect Config:

  CONNECT_OFFSET_FLUSH_INTERVAL_MS: "10000"
  CONNECT_OFFSET_FLUSH_TIMEOUT_MS: "60000"
  CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: All

  CONNECT_BUFFER_MEMORY: "335544320"
  CONNECT_MAX_REQUEST_SIZE: "104857600"
  CONNECT_RECEIVE_BUFFER_BYTES: "26214400"
  CONNECT_SEND_BUFFER_BYTES: "104857600"

  CONNECT_PRODUCER_BUFFER_MEMORY: "335544320"
  CONNECT_PRODUCER_MAX_REQUEST_SIZE: "104857600"
  CONNECT_PRODUCER_RECEIVE_BUFFER_BYTES: "26214400"
  CONNECT_PRODUCER_SEND_BUFFER_BYTES: "104857600"

  CONNECT_PRODUCER_OVERRIDE_BUFFER_MEMORY: "335544320"
  CONNECT_PRODUCER_OVERRIDE_MAX_REQUEST_SIZE: "104857600"
  CONNECT_PRODUCER_OVERRIDE_RECEIVE_BUFFER_BYTES: "26214400"
  CONNECT_PRODUCER_OVERRIDE_SEND_BUFFER_BYTES: "104857600"

Connector Settings (Debezium):

    "producer.override.buffer.memory"        = "335544320"
    "producer.override.compression.type"     = "lz4"
    "producer.override.max.request.size"     = "104857600"
    "producer.override.receive.buffer.bytes" = "26214400"
    "producer.override.send.buffer.bytes"    = "104857600"

Broker Settings:

socket.request.max.bytes=104857600
message.max.bytes = 5242880
replica.fetch.max.bytes = 5242880
socket.receive.buffer.bytes=102400
socket.send.buffer.bytes=102400
group.max.session.timeout.ms=300000

Topic Settings:

# internal topics (config, offset, status)
'max.message.bytes' => 104857600,

# db history topic
'max.message.bytes' => 104857600,

# output topic
'max.message.bytes' => 104857600,

Hopefully I’m not falling down a bit of a rabbit hole here as I’m assuming kafka_connect_connect_metrics_request_size_max is the absolute maximum size of a producer message that can be sent from the connectors in kafka connect? It would be nice to see exactly what sets this metric so any help would be appreciated.

Thanks in advance

@AlexeiBlue - have you been to get this to work?

I only set the variable in the Kubernetes manifests:

CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: All

I restarted the connect pods and then I set the configuration of the connector in Control Center, but it does not pick up the changes. The Kafka Connect connector ignores all the options producer.override or consumer.override according to the logs.

@rmoff

would that be possible to expand on the documentation wrt Kubernetes deployment and advanced configuration?