Hello!
I’m trying to archive in S3 messages from topic, which is using JSON format.
Messages which we’re receiving are sometimes large, not most of them but it happens +/-100 times daily. Topics are configured to accept max size of 20MB.
Exception reported in DLQ is:
...
__connect.errors.stage VALUE_CONVERTER
__connect.errors.class.name org.apache.kafka.connect.json.JsonConverter
__connect.errors.exception.class.name org.apache.kafka.connect.errors.DataException
__connect.errors.exception.message Converting byte[] to Kafka Connect data failed due to serialization error:
__connect.errors.exception.stacktrace org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:536)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.exc.StreamConstraintsException: String length (5046272) exceeds the maximum length (5000000)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
... 18 more
Caused by: com.fasterxml.jackson.core.exc.StreamConstraintsException: String length (5046272) exceeds the maximum length (5000000)
at com.fasterxml.jackson.core.StreamReadConstraints.validateStringLength(StreamReadConstraints.java:290)
at com.fasterxml.jackson.core.util.ReadConstrainedTextBuffer.validateStringLength(ReadConstrainedTextBuffer.java:27)
at com.fasterxml.jackson.core.util.TextBuffer.finishCurrentSegment(TextBuffer.java:931)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2584)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishAndReturnString(UTF8StreamJsonParser.java:2560)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:335)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer._deserializeContainerNoRecursion(JsonNodeDeserializer.java:572)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:100)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:25)
at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4867)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3233)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
... 19 more
I thought it might be related to Jackson’s DEFAULT_MAX_STRING_LEN
, but after upgrading to confluent-community-7.7.1 issue is still there.
It looks like confluent-community-7.7.1 is using Jackson 2.16.0 and 12.6.2, which has increased DEFAULT_MAX_STRING_LEN
to value of 20MB (exactly what we need).
Exception text pattern doesn’t seem to be coming from com.fasterxml.jackson.core.StreamReadConstraints.validateStringLength (2.16+), as format is slightly different now.
Has anyone had similar problem?
Kind regards,
Marcin