Dear All:
I want to synchronize the delete operation through jdbc.
Created this sink:
CREATE SINK CONNECTOR SINK_MYSQL_T4 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://10.192.xx.xx:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'mysql_t4',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://10.192.xx.xx:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = 'id',
'insert.mode' = 'upsert',
'delete.enabled' = 'true'
);
ksql> DESCRIBE CONNECTOR "SINK_MYSQL_T4"
>;
Name : SINK_MYSQL_T4
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : 10.192.xx.72:8083
Task ID | State | Error Trace
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
*Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'SINK_MYSQL_T4' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='mysql_t4',partition=0,offset=0,timestamp=1631673892351) with a null key and string key schema.*
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:116)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
The source table has a primary key, and each column is non-empty. How to solve the above error? Looking forward to getting a reply, thank you!