HTTP Sink V2 Connector issue

Hello,

I am first time setting the http sink connector to send an JSON message. The connecter setup completed and i am able to send the test message. however, while i send the message from topic, i get this error though the schema has been added and the message is valid as per the validation.

[
  {
    "key": "__connect.errors.topic",
    "value": "create_policy"
  },
  {
    "key": "__connect.errors.partition",
    "value": "5"
  },
  {
    "key": "__connect.errors.offset",
    "value": "1"
  },
  {
    "key": "__connect.errors.connector.name",
    "value": "lcc-g7qmwn"
  },
  {
    "key": "__connect.errors.task.id",
    "value": "0"
  },
  {
    "key": "__connect.errors.stage",
    "value": "VALUE_CONVERTER"
  },
  {
    "key": "__connect.errors.class.name",
    "value": "org.apache.kafka.connect.json.JsonConverter"
  },
  {
    "key": "__connect.errors.exception.class.name",
    "value": "org.apache.kafka.connect.errors.DataException"
  },
  {
    "key": "__connect.errors.exception.message",
    "value": "Converting byte[] to Kafka Connect data failed due to serialization error: "
  },
  {
    "key": "__connect.errors.exception.stacktrace",
    "value": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:346)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:550)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:550)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:525)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:351)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:253)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:222)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:256)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:311)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message to JSON in topic create_policy\n\tat org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:74)\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)\n\t... 18 more\n"
  }
]

hey @Kuldeep-123

your topic contains Json data?

best,
michael

Yes the payload in Jason

Are you using JSON Schema? I believe that you’d hit this specific error if so and would want to use io.confluent.connect.json.JsonSchemaConverter instead of org.apache.kafka.connect.json.JsonConverter.

If that’s not it, could you please share a message and your connector config?