How jdbc sink realizes the synchronization of delete operation

Dear All:
I want to synchronize the delete operation through jdbc.

Created this sink:

CREATE SINK CONNECTOR SINK_MYSQL_T4 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://10.192.xx.xx:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'mysql_t4',
    'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://10.192.xx.xx:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_key',
    'pk.fields'                           = 'id',
    'insert.mode'                         = 'upsert',
    'delete.enabled'                      = 'true'
);
ksql> DESCRIBE CONNECTOR  "SINK_MYSQL_T4"
>;

Name                 : SINK_MYSQL_T4
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : 10.192.xx.72:8083

 Task ID | State  | Error Trace                                                                                                                                                                                                                                                                                                                                                                              
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
*Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'SINK_MYSQL_T4' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='mysql_t4',partition=0,offset=0,timestamp=1631673892351) with a null key and string key schema.*
	at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:116)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more

The source table has a primary key, and each column is non-empty. How to solve the above error? Looking forward to getting a reply, thank you!

The source table:

mysql> show create table t4 \G
*************************** 1. row ***************************
       Table: t4
Create Table: CREATE TABLE `t4` (
  `id` int(11) NOT NULL,
  `name` varchar(100) NOT NULL,
  `age` int(11) NOT NULL,
  `address` varchar(100) NOT NULL,
  `joining_date` datetime NOT NULL,
  `department_id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
1 row in set (0.00 sec)

The error it quite a good one. The message it’s read from the topic mysql_t4, at the given offset and partition (0/0), has got a null key.

How are you ingesting the data into the Kafka topic in the first place? It’s not so much about the source table DDL as the Kafka message itself that the JDBC sink is consuming.


General references that you might find useful also:

(specifically see this point in the video: Configuring DELETEs from tombstone messages)

Thanks you replay .

I use this source connect to sync data to kafka:

 curl -X POST http://10.192.xx.xx:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_yyt_04",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://10.192.xx.xx:3306/test",
                "connection.user": "root",
                "connection.password": "Mysql@123$",
                "topic.prefix": "mysql-0",
                "mode":"bulk",
                "poll.interval.ms" : 100000,
                "catalog.pattern" : "test",
                "table.whitelist" : "t4"
                }
        }'

Is it related to this configuration?

You’re not setting the key on the data that’s being ingested. You can use a Single Message Transform to do this. Check out these resources:

https://kafka-tutorials.confluent.io/connect-add-key-to-source/kafka.html

Hi, Robin

I recreated the source and sink according to your suggestion, but encountered the following error, can you give me some suggestions?

curl -X POST http://10.192.xx.xx:8083/connectors -H “Content-Type: application/json” -d ’
{ “name”: “jdbc_source_yyt_05”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”,
“connection.url”: “jdbc:mysql://10.192.15.xx:3306/test”,
“connection.user”: “root”,
“connection.password”: “Mysql@123$”,
“topic.prefix”: “mysql-02-”,
“poll.interval.ms”: 10000,
“tasks.max”:1,
“table.whitelist” : “t4”,
“mode”:“incrementing”,
“incrementing.column.name”: “id”,
“validate.non.null”: false,
“transforms”: “copyIdToKey,extractKeyFromStruct”,
“transforms.copyIdToKey.type”: “org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.copyIdToKey.fields”: “id”,
“transforms.extractKeyFromStruct.type”:“org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.extractKeyFromStruct.field”:“id”
}
}’

create sink:

ksql> CREATE SINK CONNECTOR SINK_mysql_t4 WITH (

‘connector.class’ = ‘io.confluent.connect.jdbc.JdbcSinkConnector’,
‘connection.url’ = ‘jdbc:postgresql://10.192.15.xx:5432/’,
‘connection.user’ = ‘postgres’,
‘connection.password’ = ‘postgres’,
‘topics’ = ‘mysql-02-t4’,
‘key.converter’ = ‘org.apache.kafka.connect.storage.StringConverter’,
‘value.converter’ = ‘io.confluent.connect.avro.AvroConverter’,
‘value.converter.schema.registry.url’ = ‘http://10.192.xx.xx:8081’,
‘auto.create’ = ‘true’,
‘pk.mode’ = ‘record_key’,
‘pk.fields’ = ‘id’,
‘insert.mode’ = ‘upsert’,
‘delete.enabled’ = ‘true’
);

Message

Created connector SINK_MYSQL_T4

ksql> DESCRIBE CONNECTOR SINK_MYSQL_T4;

Name : SINK_MYSQL_T4
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : 10.192.xx.xx:8083

Task ID | State | Error Trace

0 | RUNNING |

ksql> DESCRIBE CONNECTOR SINK_MYSQL_T4;

Name : SINK_MYSQL_T4
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : 10.192.xx.xx:8083

Task ID | State | Error Trace

0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO “mysql-02-t4” (“id”,“name”,“age”,“address”,“joining_date”,“department_id”) VALUES (’
',‘Ninco’,28,‘zhongshan’,‘2021-09-10 07:38:34+00’::timestamp,29) ON CONFLICT (“id”) DO UPDATE SET “name”=EXCLUDED.“name”,“age”=EXCLUDED.“age”,“address”=EXCLUDED.“address”,“joining_date”=EXCLUDED.“joining_date”,“department_id”=EXCLUDED.“department_id” was aborted: ERROR: invalid byte sequence for encoding “UTF8”: 0x00 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding “UTF8”: 0x00
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding “UTF8”: 0x00

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more

Caused by: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO “mysql-02-t4” (“id”,“name”,“age”,“address”,“joining_date”,“department_id”) VALUES (’
',‘Ninco’,28,‘zhongshan’,‘2021-09-10 07:38:34+00’::timestamp,29) ON CONFLICT (“id”) DO UPDATE SET “name”=EXCLUDED.“name”,“age”=EXCLUDED.“age”,“address”=EXCLUDED.“address”,“joining_date”=EXCLUDED.“joining_date”,“department_id”=EXCLUDED.“department_id” was aborted: ERROR: invalid byte sequence for encoding “UTF8”: 0x00 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding “UTF8”: 0x00
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding “UTF8”: 0x00

at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
... 11 more

ksql>

I check the target db,because use ‘auto.create’ = ‘true’ , so the target db auto create the table ,but id column type show text, at source db the id column type is int. why ?

postgres=# \d mysql-02-t4
Table “public.mysql-02-t4”
Column | Type | Collation | Nullable | Default
---------------±----------------------------±----------±---------±--------
id | text | | not null |
name | text | | not null |
age | integer | | not null |
address | text | | not null |
joining_date | timestamp without time zone | | not null |
department_id | integer | | not null |
Indexes:
“mysql-02-t4_pkey” PRIMARY KEY, btree (id)

other, the error display:

“Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO “mysql-02-t4” (“id”,“name”,“age”,“address”,“joining_date”,“department_id”) VALUES (’
',‘Ninco’,28,‘zhongshan’,‘2021-09-10 07:38:34+00’::timestamp,29) ON CONFLICT (“id”) DO UPDATE SET “name”=EXCLUDED.“name”,“age”=EXCLUDED.“age”,“address”=EXCLUDED.“address”,“joining_date”=EXCLUDED.“joining_date”,“department_id”=EXCLUDED.“department_id” was aborted: ERROR: invalid byte sequence for encoding “UTF8”: 0x00 Call getNextException to see other errors in the batch.”

ID values is null , why ?

dear rmoff:
I follow this document to synchronize the mysql table to pg:ksqlDB Tutorial: Add key to data ingested through Kafka Connect using ksqlDB
Why can’t update and delete operations be synchronized?

DROP TABLE IF EXISTS cities;
CREATE TABLE cities (city_id int PRIMARY KEY NOT NULL, name VARCHAR(255), state VARCHAR(255));
INSERT INTO cities (city_id, name, state) VALUES (1, ‘Raleigh’, ‘NC’);
INSERT INTO cities (city_id, name, state) VALUES (2, ‘Mountain View’, ‘CA’);
INSERT INTO cities (city_id, name, state) VALUES (3, ‘Knoxville’, ‘TN’);
INSERT INTO cities (city_id, name, state) VALUES (4, ‘Houston’, ‘TX’);
INSERT INTO cities (city_id, name, state) VALUES (5, ‘Olympia’, ‘WA’);
INSERT INTO cities (city_id, name, state) VALUES (6, ‘Bismarck’, ‘ND’);
INSERT INTO cities (city_id, name, state) VALUES (7, ‘GuangZhou’, ‘GZ’);
INSERT INTO cities (city_id, name, state) VALUES (8, ‘Foshan’, ‘FS’);

CREATE SOURCE CONNECTOR JDBC_SOURCE_POSTGRES_01 WITH (
‘connector.class’= ‘io.confluent.connect.jdbc.JdbcSourceConnector’,
‘connection.url’= ‘jdbc:mysql://10.192.39.183:3306/mcs_t’,
‘connection.user’= ‘XP_Admin’,
‘connection.password’= ‘XP_AdminXP_Admin’,
‘mode’= ‘incrementing’,
‘table.whitelist’ = ‘cities’,
‘incrementing.column.name’= ‘city_id’,
‘topic.prefix’= ‘mysql_01_’,
‘poll.interval.ms’ = ‘10000’,
‘transforms’= ‘copyFieldToKey,extractKeyFromStruct,removeKeyFromValue’,
‘transforms.copyFieldToKey.type’= ‘org.apache.kafka.connect.transforms.ValueToKey’,
‘transforms.copyFieldToKey.fields’= ‘city_id’,
‘transforms.extractKeyFromStruct.type’= ‘org.apache.kafka.connect.transforms.ExtractField$Key’,
‘transforms.extractKeyFromStruct.field’= ‘city_id’,
‘transforms.removeKeyFromValue.type’= ‘org.apache.kafka.connect.transforms.ReplaceField$Value’,
‘transforms.removeKeyFromValue.blacklist’= ‘city_id’,
‘key.converter’ = ‘org.apache.kafka.connect.converters.IntegerConverter’
);

CREATE SINK CONNECTOR SINK_FOO_09 WITH (
‘connector.class’ = ‘io.confluent.connect.jdbc.JdbcSinkConnector’,
‘connection.url’ = ‘jdbc:postgresql://10.192.15.69:5432/postgres’,
‘connection.user’ = ‘postgres’,
‘connection.password’ = ‘postgres’,
‘topics’ = ‘mysql_01_cities’,
‘key.converter’ = ‘org.apache.kafka.connect.converters.IntegerConverter’,
‘value.converter’ = ‘io.confluent.connect.avro.AvroConverter’,
‘value.converter.schema.registry.url’ = ‘http://10.192.11.71:8081’,
‘auto.create’ = ‘true’,
‘pk.mode’ = ‘record_key’,
‘pk.fields’ = ‘city_id’,
‘insert.mode’ = ‘upsert’,
‘delete.enabled’ = ‘true’
);

This time ,at target db is create table mysql_01_cities.but the source only sync insert operation.
I use update and delete on source table ,the topics not display any change,why ?

mysql> select * from cities;
±--------±--------------±------+
| city_id | name | state |
±--------±--------------±------+
| 1 | Foshan | FS |
| 2 | Mountain View | CA |
| 3 | Hangzhou | TN |
| 4 | Houston | TX |
| 5 | Olympia | WA |
| 6 | Bismarck | ND |
| 7 | GuangZhou | GZ |
| 8 | Foshan | FS |
| 9 | Nanhai | NH |
±--------±--------------±------+
9 rows in set (0.00 sec)

mysql> INSERT INTO cities (city_id, name, state) VALUES (10, ‘USA’, ‘CA’);
Query OK, 1 row affected (0.00 sec)

mysql> commit;
Query OK, 0 rows affected (0.00 sec)

mysql> INSERT INTO cities (city_id, name, state) VALUES (11, ‘English’, ‘UK’);
Query OK, 1 row affected (0.00 sec)

mysql> commit;
Query OK, 0 rows affected (0.00 sec)

mysql> update cities set name=‘Amercian’ where city_id=10;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0

mysql> commit;
Query OK, 0 rows affected (0.00 sec)

mysql> delete from cities where city_id=11;
Query OK, 1 row affected (0.00 sec)

mysql> commit;
Query OK, 0 rows affected (0.00 sec)

mysql>
ksql> print mysql_01_cities from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/09/18 05:43:15.527 Z, key: 9, value: {“name”: “Nanhai”, “state”: “NH”}, partition: 0
rowtime: 2021/09/18 05:47:35.486 Z, key: 10, value: {“name”: “USA”, “state”: “CA”}, partition: 0
rowtime: 2021/09/18 05:48:05.496 Z, key: 11, value: {“name”: “English”, “state”: “UK”}, partition: 0

Press CTRL-C to interrupt

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.