Hello Team,
I am having an issue with JDBC sink connector to consume topic created by KSQL.
below options I have tried to make it work:
- with key and without key
- with schema registry and with schema manually created
- with AVRO and with JSON
two types of Errors I am facing,
with scenario 3 error looks like below
with scenario 1 & 2 error says
.DataException: JsonConverter with schemas.enable requires “schema” and “payload” fields and may not contain additional fields
I followed few articles and videos simillar to my issue but none of them worked
reference articles:
- ksqlDB and the Kafka Connect JDBC Sink - Kafka Connect / Self-Managed Connectors - Confluent Community
- Can not write stream into jdbc sink unless key is set · Issue #3487 · confluentinc/ksql · GitHub
My configuration and topic given below, which I am trying to sink in Oracle database:
-
scenario without key:
{
“name”: “destination-connector-simple”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“topics”: “MY_STREAM1”,
“tasks.max”: “1”,
“connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
“connection.user”: “c__sinkuser”,
“connection.password”: “sinkpw”,
“table.name.format”: “kafka_customers”,
“auto.create”: “true”,
“key.ignore”:“true”,
“pk.mode”: “none”,
“value.converter.schemas.enable”: “false”,
“key.converter.schemas.enable”: “false”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”
}
} -
scenario with key:
{
“name”: “oracle-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “MY_EMPLOYEE”,
“table.name.format”: “kafka_customers”,
“connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
“connection.user”: “c__sinkuser”,
“connection.password”: “sinkpw”,
“auto.create”:true,
“auto.evolve”:true,
“pk.fields”: “ID”,
“insert.mode”:“upsert”,
“delete.enabled”:true,
“delete.retention.ms”:100,
“pk.mode”: “record_key”,
“key.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“key.converter.schema.registry.url”: “h t tp / schema-registry :8081”,
“value.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“value.converter.schema.registry.url”: “htt p : / /schema-registry :8081”
}
}
topic to be consumed in sink (Any one of them would be okay):
-
without key
print ‘MY_STREAM1’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“L_ADD_ID”:“201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“L_ADD_ID”:“202”}, partition: 0 -
topic with key:
ksql> print ‘MY_EMPLOYEE’ from beginning;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: 101, value: {“EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“ADD_ID”:“201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: 102, value: {“EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“ADD_ID”:“202”}, partition: 0 -
Topic with schema (manually created)
ksql> print ‘E_SCHEMA’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/06 20:01:25.824 Z, key: , value: {“SCHEMA”:{“TYPE”:“struct”,“FIELDS”:[{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_EID”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“NAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“LAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_ADD_ID”}],“OPTIONAL”:false,“NAME”:“”},“PAYLOAD”:{“L_EID”:“201”,“NAME”:“Vishuddha”,“LNAME”:“Sh”,“L_ADD_ID”:“401”}}, partition: 0 -
Topic with Avro:
ksql> print ‘MY_STREAM_AVRO’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0
could you please help me complete my POC in time.