Inserts and Updates of records from one database to another work just fine, but when a record is deleted from the source database, the deleted records are not removed from target db. I mean records(deleted records) are not deleted from target database.
I have tombstone set to true
tombstones.on.delete=true
Any ideas? Articles are saying it doen’t work, that I should use Debezium?
We are using JDBC Sink Connector to delete table row(s).
In the JSON config, we have (among the other config) →
“pk.mode”: “record_key”
“pk.fields”: “id”
“delete.enabled”: “true”
“auto.evolve”: “false”
Please also ensure your Kafka Record is a tombstone (value is equals to null) and the key is the PK of the row you want to delete.
You can also a look to your Worker’s log : you will see the SQL request run by the Connector.
Thanks, I will try it out. The only difference with my configuration was that my “auto.evolve”:“true”.
I am going to set it to false and monitor the job to observe for deletes.
Thanks
@EventStreaming95
Hi, I tried this as you suggested but it didn’t work on my side. I have pasted my source and sink connector configuration below. Please let me know what I am doing wrong.
Hi,
Thanks for you help but please forgive me cos I’m new to kafka.
I can’t find the class for transforms.createTombstone.type. Should I have to install a jar file to be able to use this SMT?
I think your problem is that you have this SMT: transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
That will take these fields: transforms.createKey.fields=ID,NBRKEY
And use them as your PK. This works fine for inserts and updates but when the sink connector does a delete it uses the tombstone record which consists of a message key and a null value. Because you are telling your connector to pull the ID and NBRKEY from the message.value which is null for tombstone records, it cannot find the row to delete. I believe your choices are to 1) change your topic so ID and NBRKEY are the kafka message key or don’t enable deletes, use another transform to bring the op or deleted fields from the message header into the topic value and either have your SQL ignore those rows or delete them after the fact.
Thanks all.
I think it makes sense what you have just described. I am new to kafka and teaching myself.
I used the createKey transform because my source table doesn’t have a primary key field so I had to create it for insert and update.
I will trying using the message key to see if it works.