Jdbc sink connector to load data from 2 different topics to 2 different table

I have 2 different kafka topics (task,member) which does not have
similar column names. Using JDBC sink connector I have to write the data from kafka topics (task,member) to mysql tables(test_task,test_member) using UPSERT mode

data in task topics:(jsondata)
{“orderid”: 1,“name”:“sangeetha”,“products”:“Mobile”,“modeofpayment”:“UPI”}

data in membertopic:(jsondata)
{“pzinskey”: “1”,“membername”:“sangeetha”,“type”:“Gold”}

Below is my sink connector configuration:
curl -X PUT -H “Content-Type: application/json” http://localhost:8083/connectors/multipletopics/config -d ‘{
“name” : “multipletopics”,
“connector.class” : “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url”: “jdbc:mysql://172.19.0.2:3306/sample”,
“connection.user”: “user”,
“connection.password”: “*****”,
“topics”: “task,member”,
“auto.create”:“true”,
“auto.evolve”:“true”,
“insert.mode”:“upsert”,
“pk.mode”:“record_value”,
“pk.fields”:“orderid,pzinskey”,
“table.name.format”:“test_${topic}”,
“value.converter” : “io.confluent.connect.json.JsonSchemaConverter”,
“value.converter.schema.registry.url”: “http://schema-registry:8081”,
“tasks.max”: “1”
}’

but the above configuration is not working… getting below error
Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table ‘test_task’ is RECORD_VALUE with configured PK fields [orderid, pzinskey], but record value schema does not contain field: pzinskey

Is there a way to write data from 2 different topics to 2 different table using UPSERT mode via jdbc sink connector

record value schema does not contain field: pzinskey

You’ve shown the records, but please show the JSONSchema you’ve stored in the registry

Alternatively, neither of your records contain both of these fields, as the error says, so you must use two different connector configurations, one for each topic with their own PK.

Json Schema for task topic:
{“type”:“object”,“properties”:{“orderid”:{“type”:“number”},“name”:{“type”:“string”},“products”:{“type”:“string”},“modeofpayment”:{“type”:“string”}}}

json schema for member topic:
{“type”:“object”,“properties”:{“pzinskey”:{“type”:“number”},“membername”:{“type”:“string”},“type”:{“type”:“string”}}}

As mentioned, every pk.fields value needs to exist in every one of the topics, so the solution here would be to create distinct connector configurations.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.