Hello,
I am first time setting the http sink connector to send an JSON message. The connecter setup completed and i am able to send the test message. however, while i send the message from topic, i get this error though the schema has been added and the message is valid as per the validation.
[
{
"key": "__connect.errors.topic",
"value": "create_policy"
},
{
"key": "__connect.errors.partition",
"value": "5"
},
{
"key": "__connect.errors.offset",
"value": "1"
},
{
"key": "__connect.errors.connector.name",
"value": "lcc-g7qmwn"
},
{
"key": "__connect.errors.task.id",
"value": "0"
},
{
"key": "__connect.errors.stage",
"value": "VALUE_CONVERTER"
},
{
"key": "__connect.errors.class.name",
"value": "org.apache.kafka.connect.json.JsonConverter"
},
{
"key": "__connect.errors.exception.class.name",
"value": "org.apache.kafka.connect.errors.DataException"
},
{
"key": "__connect.errors.exception.message",
"value": "Converting byte[] to Kafka Connect data failed due to serialization error: "
},
{
"key": "__connect.errors.exception.stacktrace",
"value": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:346)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:550)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:550)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:525)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:351)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:253)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:222)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:256)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:311)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message to JSON in topic create_policy\n\tat org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:74)\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)\n\t... 18 more\n"
}
]