S3 Sink Connector - fails to deserialize large JSON messages

Hello!

I’m trying to archive in S3 messages from topic, which is using JSON format.

Messages which we’re receiving are sometimes large, not most of them but it happens +/-100 times daily. Topics are configured to accept max size of 20MB.

Exception reported in DLQ is:

...
__connect.errors.stage	VALUE_CONVERTER
__connect.errors.class.name	org.apache.kafka.connect.json.JsonConverter
__connect.errors.exception.class.name	org.apache.kafka.connect.errors.DataException
__connect.errors.exception.message	Converting byte[] to Kafka Connect data failed due to serialization error: 
__connect.errors.exception.stacktrace	org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326)
 at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:536)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
 at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.exc.StreamConstraintsException: String length (5046272) exceeds the maximum length (5000000)
 at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
 ... 18 more
Caused by: com.fasterxml.jackson.core.exc.StreamConstraintsException: String length (5046272) exceeds the maximum length (5000000)
 at com.fasterxml.jackson.core.StreamReadConstraints.validateStringLength(StreamReadConstraints.java:290)
 at com.fasterxml.jackson.core.util.ReadConstrainedTextBuffer.validateStringLength(ReadConstrainedTextBuffer.java:27)
 at com.fasterxml.jackson.core.util.TextBuffer.finishCurrentSegment(TextBuffer.java:931)
 at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2584)
 at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishAndReturnString(UTF8StreamJsonParser.java:2560)
 at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:335)
 at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer._deserializeContainerNoRecursion(JsonNodeDeserializer.java:572)
 at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:100)
 at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:25)
 at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
 at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4867)
 at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3233)
 at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
 ... 19 more

I thought it might be related to Jackson’s DEFAULT_MAX_STRING_LEN, but after upgrading to confluent-community-7.7.1 issue is still there.

It looks like confluent-community-7.7.1 is using Jackson 2.16.0 and 12.6.2, which has increased DEFAULT_MAX_STRING_LEN to value of 20MB (exactly what we need).

Exception text pattern doesn’t seem to be coming from com.fasterxml.jackson.core.StreamReadConstraints.validateStringLength (2.16+), as format is slightly different now.

Has anyone had similar problem?

Kind regards,
Marcin

Hello Marcin,

Which S3 connector version plugin are you using ?

Latest (10.5.16) has jackson 10.5.16 bundled already:

jackson-annotations-2.16.1.jar
jackson-core-2.16.1.jar
jackson-databind-2.16.1.jar
jackson-dataformat-cbor-2.16.1.jar
jackson-dataformat-csv-2.16.1.jar

I’ve tested with 15Mb message and it is working fine. Are you also sure that messages are really not > 20Mb ?

Hi!

Thanks! We’ve upgraded to the latest and in the end it solved the issue. However directly after upgrading we could possibly still have older jackson jars somewhere on classpath which could cause that it was picked by S3 sink connector instead of 2.16.1 Now I see all large messages archived on S3 and dlq is empty.

Kind regards,
Marcin