Kafka connect - JDBC sink connectivity with KSQL

Hello Team,

I am having an issue with JDBC sink connector to consume topic created by KSQL.
below options I have tried to make it work:

  1. with key and without key
  2. with schema registry and with schema manually created
  3. with AVRO and with JSON
    two types of Errors I am facing,
    with scenario 3 error looks like below

    with scenario 1 & 2 error says
    .DataException: JsonConverter with schemas.enable requires “schema” and “payload” fields and may not contain additional fields

I followed few articles and videos simillar to my issue but none of them worked
reference articles:

  1. :movie_camera: ksqlDB and the Kafka Connect JDBC Sink - Kafka Connect / Self-Managed Connectors - Confluent Community
  2. Can not write stream into jdbc sink unless key is set · Issue #3487 · confluentinc/ksql · GitHub

My configuration and topic given below, which I am trying to sink in Oracle database:

  1. scenario without key:
    {
    “name”: “destination-connector-simple”,
    “config”: {
    “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
    “topics”: “MY_STREAM1”,
    “tasks.max”: “1”,
    “connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
    “connection.user”: “c__sinkuser”,
    “connection.password”: “sinkpw”,
    “table.name.format”: “kafka_customers”,
    “auto.create”: “true”,
    “key.ignore”:“true”,
    “pk.mode”: “none”,
    “value.converter.schemas.enable”: “false”,
    “key.converter.schemas.enable”: “false”,
    “key.converter”: “org.apache.kafka.connect.storage.StringConverter”
    }
    }

  2. scenario with key:
    {
    “name”: “oracle-sink”,
    “config”: {
    “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
    “tasks.max”: “1”,
    “topics”: “MY_EMPLOYEE”,
    “table.name.format”: “kafka_customers”,
    “connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
    “connection.user”: “c__sinkuser”,
    “connection.password”: “sinkpw”,
    “auto.create”:true,
    “auto.evolve”:true,
    “pk.fields”: “ID”,
    “insert.mode”:“upsert”,
    “delete.enabled”:true,
    “delete.retention.ms”:100,
    “pk.mode”: “record_key”,
    “key.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
    “key.converter.schema.registry.url”: “h t tp :confused: / schema-registry :8081”,
    “value.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
    “value.converter.schema.registry.url”: “htt p : / /schema-registry :8081”
    }
    }

topic to be consumed in sink (Any one of them would be okay):

  1. without key
    print ‘MY_STREAM1’ from beginning;
    Key format: ¯_(ツ)_/¯ - no data processed
    Value format: JSON or KAFKA_STRING
    rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“L_ADD_ID”:“201”}, partition: 0
    rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“L_ADD_ID”:“202”}, partition: 0

  2. topic with key:
    ksql> print ‘MY_EMPLOYEE’ from beginning;
    Key format: JSON or KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 2023/02/05 18:16:16.553 Z, key: 101, value: {“EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“ADD_ID”:“201”}, partition: 0
    rowtime: 2023/02/05 18:16:16.554 Z, key: 102, value: {“EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“ADD_ID”:“202”}, partition: 0

  3. Topic with schema (manually created)
    ksql> print ‘E_SCHEMA’ from beginning;
    Key format: ¯_(ツ)_/¯ - no data processed
    Value format: JSON or KAFKA_STRING
    rowtime: 2023/02/06 20:01:25.824 Z, key: , value: {“SCHEMA”:{“TYPE”:“struct”,“FIELDS”:[{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_EID”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“NAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“LAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_ADD_ID”}],“OPTIONAL”:false,“NAME”:“”},“PAYLOAD”:{“L_EID”:“201”,“NAME”:“Vishuddha”,“LNAME”:“Sh”,“L_ADD_ID”:“401”}}, partition: 0

  4. Topic with Avro:
    ksql> print ‘MY_STREAM_AVRO’ from beginning;
    Key format: ¯_(ツ)_/¯ - no data processed
    Value format: AVRO or KAFKA_STRING
    rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
    rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0
    rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
    rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0

could you please help me complete my POC in time.

This topic was automatically closed after 30 days. New replies are no longer allowed.