JDBC sink connector Schema

Hi There, I’m trying to insert my data from a Kafka Topic into clickhouse with theJDBC Sink Connector.
This is my connector config

{
  "name": "JdbcSinkConnector",
  "config": {
    "value.converter.schemas.enable": "false",
    "name": "JdbcSinkConnector_auth_events",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.retry.timeout": "100",
    "errors.retry.delay.max.ms": "60000",
    "topics": "my_topic",
    "connection.url": "jdbc:clickhouse://clickhouse:8123/my_db",
    "connection.user": "default",
    "connection.password": "",
    "insert.mode": "insert",
    "batch.size": "1000",
    "table.name.format": "my_tb",
    "pk.mode": "record_value",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

and this is my message

{
  "ts": 1682006722000,
  "sessionId": 194860,
  "auth": "Logged In",
  "level": "free",
  "itemInSession": 5,
  "city": "Erie",
}

but got an error Error: Sink connector 'JdbcSinkConnector_my_topic' is configured with 'delete.enabled=false' and 'pk.mode=record_value' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='my_topic',partition=0,offset=0,timestamp=1687020175058) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask)
Pls help me, thanks all

What’s unclear about the error? Plain JSON has no schema. Are you able to modify your producer to send a schema with the JSON, or even better, use a different format like Avro or Protobuf which does have a schema?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.