Confluent Cloud Managed Kafka Connect -
I have an application to produce protobuffer based serialized data to a topic, and I have also included:
“value.converter”: “io.confluent.connect.protobuf.ProtobufConverter”,
“value.converter.basic.auth.credentials.source”: “USER_INFO”,
“value.converter.schema.registry.basic.auth.user.info”: “----”,
“value.converter.schema.registry.url”: “—”,
“value.converter.schemas.enable”: “true”,
but the connector still fails with the following stack:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:525)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:503)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:339)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:282)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:525)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
… 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message to JSON in topic nsr_raw
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
To my understanding, it seems like it is using a json convertor instead of a protobuffer converter to convert topic msg to connect data.