Upsert mode not working as expected

Hello everybody,
i’m configuring the sink for my apache connector to write on a mysql db.
This is the config of my sink connector:

"Content-Type: application/json" -d '{
     "connector.class"       : "io.confluent.connect.jdbc.JdbcSinkConnector",
     "connection.url"        : "jdbc:mysql://172.48.49.23:3306/DB?serverTimezone=UTC",
     "tasks.max"             : "1",
     "topics"                : "USERS",
     "transforms"            : "unwrap",
     "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
     "connection.user"       : "user",
     "connection.password"   : "password",
     "auto.create"           : true,
     "auto.evolve"           : true,
     "insert.mode"           : "upsert",
     "pk.mode"               : "record_value",
     "pk.fields"             : "USERNAME"
 }'

It works fine, it connects to the db and writes correctly. Problem is that when i update on the source db a line already replicated on the destination db, the sink connector adds a new line instead of modifying the already existing.
Am i missing anything? I tried both the other options, insert and update and they both work as expected.

Thanks in advance
Gerardo

Can you share the target table DDL, and an example of a row before and after a kafka message is processed? And share the full Kafka message too?

In case you’ve not seen it I did a deep-dive recently of PK handling, including upserts: Kafka Connect JDBC Sink deep-dive: Working with Primary Keys

hi! sure i had read this link, was actually the one i used to start build my sink connector.

so, about the info you needed. This is the DDL of the source table:

CREATE TABLE `table` (
   `USERNAME` varchar(50) NOT NULL,
   `PASSWORD` varchar(255) NOT NULL,
   `ENABLED` varchar(20) NOT NULL,
   `DATE_CREATION` date NOT NULL,
   `DATE_LAST_UPDATE` date DEFAULT NULL,
   `NAME` varchar(50) NOT NULL,
   `SURNAME` varchar(50) NOT NULL,
   `OFFICE_LOCATION` varchar(50) NOT NULL,
   `JOB_POSITION` varchar(50) NOT NULL,
   `CHANGE_PASSWORD` varchar(20) DEFAULT NULL,
   `DATE_LAST_PASSWORD` date DEFAULT NULL,
   `must_change_password` varchar(255) DEFAULT NULL,
   `NOTES` varchar(250) NOT NULL,
   `PREFERRED_TIMEZONE` varchar(20) DEFAULT NULL,
   PRIMARY KEY (`USERNAME`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

this is one of the row from this table:

*suser	{bcrypt}$2a$10$O3iiddefefefwfewfw3cow345jjKZsIfuApNnJL3Xi	TRUE	2020-11-24	2021-01-05	Sample	User	Rome	IT professional		2021-01-05	FALSE*	

and here is how it’s reported in the sink db

*suser                  | {bcrypt}$2a$10$O3iiddefefefwfewfw3cow345jjKZsIfuApNnJL3Xi | TRUE    |         18590 |            18632 | Sample             | User      | Rome            | IT professional      | NULL            |              18632 | FALSE                |* 	

not sure what you mean form kafka message…the entry in the topic related to this row?

thanks a lot!
Gerardo

I meant what’s the payload that the JDBC Sink connector is going to be reading to then write to the target database? Specifically, how is it serialised and does the USERNAME field exist in the value part of the message?

the sink connector adds a new line instead of modifying the already existing.

You’ve shared the source DDL - what does the sink table DDL look like?

hi,
apologies! this is the sink table DDL:

| BO_USERS | CREATE TABLE `BO_USERS` (
  `USERNAME` text NOT NULL,
  `PASSWORD` text NOT NULL,
  `ENABLED` text NOT NULL,
  `DATE_CREATION` int(11) NOT NULL,
  `DATE_LAST_UPDATE` int(11) DEFAULT NULL,
  `NAME` text NOT NULL,
  `SURNAME` text NOT NULL,
  `OFFICE_LOCATION` text NOT NULL,
  `JOB_POSITION` text NOT NULL,
  `CHANGE_PASSWORD` text,
  `DATE_LAST_PASSWORD` int(11) DEFAULT NULL,
  `must_change_password` text,
  `NOTES` text NOT NULL,
  `PREFERRED_TIMEZONE` text
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |

the row from source db is serialized using avro converter (on both connector configurations, source and sink).
this is one example of the messages (unfortunately are not so much readable as converted to avro language, not sure if you have a way to translate):

4mysql-bin-changelog.634143▒▒▒▒▒%u▒▒▒^
Struct{USERNAME=suser}     esuser▒{bcrypt}$2a$10$O3iipTN7JGswomW95nVBG.kmExKqm3cow345jjKZsIfuApNnJL3Xi
FALSE▒▒▒SampleUserRomeIT professional▒▒
FALSEsuser▒{bcrypt}$2a$10$O3iipTN7JGswomW95nVBG.kmExKqm3cow345jjKZsIfuApNnJL3XTRUE▒▒▒SampleUserRomeIT professional▒▒
FALSE1.4.2.Final
mysqlpreprod▒ϲ▒^
falseDBUSERS▒▒▒▒

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.