I have 2 different kafka topics (task,member) which does not have
similar column names. Using JDBC sink connector I have to write the data from kafka topics (task,member) to mysql tables(test_task,test_member) using UPSERT mode
data in task topics:(jsondata)
{“orderid”: 1,“name”:“sangeetha”,“products”:“Mobile”,“modeofpayment”:“UPI”}
data in membertopic:(jsondata)
{“pzinskey”: “1”,“membername”:“sangeetha”,“type”:“Gold”}
Below is my sink connector configuration:
curl -X PUT -H “Content-Type: application/json” http://localhost:8083/connectors/multipletopics/config -d ‘{
“name” : “multipletopics”,
“connector.class” : “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url”: “jdbc:mysql://172.19.0.2:3306/sample”,
“connection.user”: “user”,
“connection.password”: “*****”,
“topics”: “task,member”,
“auto.create”:“true”,
“auto.evolve”:“true”,
“insert.mode”:“upsert”,
“pk.mode”:“record_value”,
“pk.fields”:“orderid,pzinskey”,
“table.name.format”:“test_${topic}”,
“value.converter” : “io.confluent.connect.json.JsonSchemaConverter”,
“value.converter.schema.registry.url”: “http://schema-registry:8081”,
“tasks.max”: “1”
}’
but the above configuration is not working… getting below error
Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table ‘test_task’ is RECORD_VALUE with configured PK fields [orderid, pzinskey], but record value schema does not contain field: pzinskey
Is there a way to write data from 2 different topics to 2 different table using UPSERT mode via jdbc sink connector