Confluent JDBC plugin - Delete Record using message key

I’m using Kafka Connect with Confluent JDBC plugin, on Postgres, in a “provider agnostic” environment.
Insert and update operations in the source database reflect perfectly in the target database.
Now, the delete operations do not. And this is what the question is about.
I think I understand why it happens, but I can not figure out how to solve it, if possible.
(and yes, I know I can use the Debezium plugin because it works at log level and it does not have this problem, but I want to clarify the scope of the Confluent plugin in order to make future decisions on this subject … and yes I know that there might be a particular database :wink: that contemplates this situation, but here I’m talking about regular relational databases )

This plugin (Confluent) works basically detecting changes in the fields of the database table that you specify in your source connector (incremental, timestamp, etc)
As per this article : indeed a good article!

Kafka Connect JDBC Sink deep-dive: Working with Primary Keys

in order to propagate the delete in the source to the target, you have to use the Kafka message key, as it will be the only way to identify the record to be delete in the target database, and you have to populate it with the PK of the record you want to delete at the source level. So far, so good.
So you configure your sink connector with :

    'pk.mode'                             = 'record_key',
    'pk.fields'                           = 'whateverfieldyourareusingaspk',
    'insert.mode'                         = 'upsert',
    'delete.enabled'                      = 'true'

… and as soon as the tombstone message (with its null payload) hits the Kafka topic, it should be picked up by Kafka Connect and interpreted as a deletion operation in the target system.

Ok. Pretty clear so far.
Now, how do I have to configure the source connector to generate the tombstone message in Kafka for my source delete operation?

I really can’t see how, because in a regular relational database a delete operation is not going to generate a record with null fields and the pk of the deleted record, in the same table from which the record was deleted. (and in case it did you will have a duplicate key error, unless the pk is not a requisite, but I think it is.)
And also I can’t tell if Connect would detect the “change” in the fields, realizes that it is an intended delete, and generate the tombstone message.
On the other hand, if I have to write a Kafka producer to manually generate the tombstone record, and put it into a database trigger associated with the delete operation for each table of the source database, I think this would kind of defeat the purpose of having Connect (at least at the source side).

So, here is my dead-end, and I will really appreciate your help on this, with a clear “yes you can, and this is how” or, if you want to propagate deletes “just forget it, and go the Debezium path” :slight_smile:

Of course, there could be one or many wrong facts in my interpretation of how this works, and also I might be doing some things wrong at the practical level, and I’ll be glad to be corrected because I like to properly understand how things work.
If the answer ends being yes! you can. I’ll be glad to include my configuration information to work on it and see where is the mistake I’m making.

Thank you very much!!!
Warm regards!!

if you want to propagate deletes “just forget it, and go the Debezium path”.

This :slight_smile:

If you need more detailed answer, read on.

Confluent JDBC Source connector doesn’t implement a proper CDC. The connector is as simple, as it could be - it uses your “incrementing” or “timestamp” column as the delta criteria and then executes really simple SQL query.

Depending on the “mode”, the query looks slightly different.

incrementing mode:

Begin using SQL query: SELECT * FROM TABLE1 WHERE TABLE1.I > $incrementing ORDER BY TABLE1.I ASC

incrementing+timestamp mode:

Begin using SQL query: SELECT * FROM TABLE1 WHERE TABLE1.T < $current_timestamp AND ((TABLE1.T = $timestamp AND TABLE1.I > $incrementing) OR TABLE1.T > $timestamp) ORDER BY TABLE1.T,TABLE1.I ASC

I won’t post the query for timestamp mode here, because first, I don’t have it, second, it won’t change anything, third I do not understand any usecase for this mode, since it may easily lose data.

This basically means, that if you DELETE'd a record from your Source Table, this event cannot be captured by the connector by design, because it will never be returned by the SELECT query, which is used for capturing data changes.

You may, however, workaround it by, i.e. implementing Outbox Pattern or something similar. The idea is to use a “special” queue table instead of your real table, to base your connector upon. You can write events to the queue table using a trigger, and you can implement a special type of event for DELETE operation.

You still won’t be able to write Tombstone messages to your destination topic using this approach and JDBC Source connector, but you can organize your topic records (events) in a way that they are replayable on the target side.

For instance, you can use “outbox” table on source side, and “inbox” table on target side. This means, you would need to create another trigger on the “inbox” table, which “replays” the coming events, and writes the data to your destination tables.

If you don’t want to do all this, but still want to propagate deletes,

just forget it, and go the Debezium path.


1 Like

Thank you very much for your answer !!!
I’ll take your advice, I’ll keep it as simple as possible and try the Debezium/logs way.
Just wanted to be sure not to be missing something important.
All the best!!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.