JsonConverter issue for UTF-8 conversion

Hi,

I am facing serialization error for few records of same topic where rest data records are getting serialized and successfully transferred through connector.

Worker properties for converter :

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Error Message :

Converting byte[] to Kafka Connect data failed due to serialization error:  (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:366)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 start byte 0x90

Can I change encoding format at some stage in Kafka ? or what can be possible solution for the same?

Thanks,
Shubham

The zero suggest schema registry is used, which appends some meta data to the ‘actual’ bytes. As you have now using a converter that’s assumes the bytes can be put into a String, that can be turned into JSON.
I think the problem should be solved if you switch the configuration to use schema-registry/JsonSchemaConverter.java at cc41eec5be2ebd6ea8ce12a9eecd112ae669cf98 · confluentinc/schema-registry · GitHub
(you might need to add the schema registry depency).

1 Like

Thanks. Any other solution, I can’t modify source connector bringing data into Kafka.

I think I wasn’t clear, sorry. You don’t need the change the source connector config, the data itself is fine, but it’s not matching the converter you currently configured.

Converter’s are mentioned in source connector only, so If I need to make changes it will be in Source Connector right?

Sorry, I was wrong, they error seem to suggest a different encoding than utf-8 is used, https://try2explore.com/questions/10014730. But the default should be utf-8 and doesn’t seem changed schema-registry/KafkaJsonDeserializer.java at cc41eec5be2ebd6ea8ce12a9eecd112ae669cf98 · confluentinc/schema-registry · GitHub. So I’m not really sure what is going wrong or how to fix it.

Okay, No problem. Thanks for confirming.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.