hello,
i have these source and sync tables (mysql db):
SOURCE
CREATE TABLE f_event_source
(
id_event
bigint(20) NOT NULL AUTO_INCREMENT,
event_value
varchar(20) DEFAULT NULL,
category_description
varchar(20) DEFAULT NULL,
tm_ins
datetime DEFAULT CURRENT_TIMESTAMP,
tm_upd
datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id_event
),
KEY idx_category
(category_description
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SYNC
CREATE TABLE f_event_target
(
id_event
bigint(20) NOT NULL AUTO_INCREMENT,
event_value
varchar(20) DEFAULT NULL,
category_description
varchar(20) DEFAULT NULL,
tm_ins
datetime DEFAULT CURRENT_TIMESTAMP,
tm_upd
datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id_event
),
KEY idx_category
(category_description
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
my sink connector config:
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql_upsert/config -H “Content-Type: application/json” -d ‘{
“connector.class” : “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url” : “jdbc:mysql://172.1.2.3:3306/db?serverTimezone=UTC”,
“tasks.max” : “1”,
“topics” : “f_event_source”,
“transforms” : “unwrap”,
“transforms.unwrap.type” : “io.debezium.transforms.ExtractNewRecordState”,
“connection.user” : “user”,
“connection.password” : “password”,
“auto.create” : true,
“insert.mode” : “upsert”,
“pk.mode” : “record_key”,
“pk.fields” : “event_value”
}’
the KAFKA topic is:
Struct{id_event=17} {“schema”:{“type”:“struct”,“fields”:[{“type”:“struct”,“fields”:[{“type”:“int64”,“optional”:false,“field”:“id_event”},{“type”:“string”,“optional”:true,“field”:“event_value”},{“type”:“string”,“optional”:true,“field”:“category_description”},{“type”:“int64”,“optional”:true,“name”:“io.debezium.time.Timestamp”,“version”:1,“default”:0,“field”:“tm_ins”},{“type”:“int64”,“optional”:true,“name”:“io.debezium.time.Timestamp”,“version”:1,“default”:0,“field”:“tm_upd”}],“optional”:true,“name”:“test.esperimento.f_event_source.Value”,“field”:“before”},{“type”:“struct”,“fields”:[{“type”:“int64”,“optional”:false,“field”:“id_event”},{“type”:“string”,“optional”:true,“field”:“event_value”},{“type”:“string”,“optional”:true,“field”:“category_description”},{“type”:“int64”,“optional”:true,“name”:“io.debezium.time.Timestamp”,“version”:1,“default”:0,“field”:“tm_ins”},{“type”:“int64”,“optional”:true,“name”:“io.debezium.time.Timestamp”,“version”:1,“default”:0,“field”:“tm_upd”}],“optional”:true,“name”:“test.esperimento.f_event_source.Value”,“field”:“after”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“version”},{“type”:“string”,“optional”:false,“field”:“connector”},{“type”:“string”,“optional”:false,“field”:“name”},{“type”:“int64”,“optional”:false,“field”:“ts_ms”},{“type”:“string”,“optional”:true,“name”:“io.debezium.data.Enum”,“version”:1,“parameters”:{“allowed”:“true,last,false”},“default”:“false”,“field”:“snapshot”},{“type”:“string”,“optional”:false,“field”:“db”},{“type”:“string”,“optional”:true,“field”:“table”},{“type”:“int64”,“optional”:false,“field”:“server_id”},{“type”:“string”,“optional”:true,“field”:“gtid”},{“type”:“string”,“optional”:false,“field”:“file”},{“type”:“int64”,“optional”:false,“field”:“pos”},{“type”:“int32”,“optional”:false,“field”:“row”},{“type”:“int64”,“optional”:true,“field”:“thread”},{“type”:“string”,“optional”:true,“field”:“query”}],“optional”:false,“name”:“io.debezium.connector.mysql.Source”,“field”:“source”},{“type”:“string”,“optional”:false,“field”:“op”},{“type”:“int64”,“optional”:true,“field”:“ts_ms”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“id”},{“type”:“int64”,“optional”:false,“field”:“total_order”},{“type”:“int64”,“optional”:false,“field”:“data_collection_order”}],“optional”:true,“field”:“transaction”}],“optional”:false,“name”:“test.esperimento.f_event_source.Envelope”},“payload”:{“before”:null,“after”:{“id_event”:17,“event_value”:“test143”,“category_description”:“selectw4”,“tm_ins”:1628593336000,“tm_upd”:1628593336000},“source”:{“version”:“1.4.2.Final”,“connector”:“mysql”,“name”:“test”,“ts_ms”:1628586136000,“snapshot”:“false”,“db”:“esperimento”,“table”:“f_event_source”,“server_id”:190773,“gtid”:“d94d3a4b-6ba9-11eb-b997-02f847d0c6bf:163”,“file”:“mysql-bin.000062”,“pos”:15724,“row”:0,“thread”:35,“query”:null},“op”:“c”,“ts_ms”:1628586136964,“transaction”:null}}
with this configuration, when an existing row from the source is updated, a new row in the target table is created - this doesn’t update the existing one.
I tried to use the field “id_event” as record key, but in this case the sink won’t work - basically the sink connector tries to write the key from kafka record above “Struct{id_event=17}” inside a bigint field and fails.
What can i do to make it work?
thx
G.