Deleting data in database through kafka jdbc connector not working

Inserts and Updates of records from one database to another work just fine, but when a record is deleted from the source database, the deleted records are not removed from target db. I mean records(deleted records) are not deleted from target database.

I have tombstone set to true

tombstones.on.delete=true

Any ideas? Articles are saying it doen’t work, that I should use Debezium?

We are using JDBC Sink Connector to delete table row(s).
In the JSON config, we have (among the other config) →
“pk.mode”: “record_key”
“pk.fields”: “id”
“delete.enabled”: “true”
“auto.evolve”: “false”
Please also ensure your Kafka Record is a tombstone (value is equals to null) and the key is the PK of the row you want to delete.
You can also a look to your Worker’s log : you will see the SQL request run by the Connector.

Thanks, I will try it out. The only difference with my configuration was that my “auto.evolve”:“true”.
I am going to set it to false and monitor the job to observe for deletes.
Thanks

@EventStreaming95
Hi, I tried this as you suggested but it didn’t work on my side. I have pasted my source and sink connector configuration below. Please let me know what I am doing wrong.

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
connection.password=***********
database.history.kafka.topic=schema-changes.abc
transforms=Cast,createKey
include.schema.changes=true
table.whitelist=TABLE
mode=timestamp
tombstones.on.delete=true
topic.prefix=abc-
decimal.handling.mode=double
database.schema=abc
transforms.createKey.fields=ID,NBRKEY
poll.interval.ms=10
transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value
errors.log.enable=true
snapshot.lock.timeout.ms=5000
validate.non.null=false
database.history.kafka.bootstrap.servers=kafka:9092
quote.sql.identifier=always
connection.user=username
numeric.mapping=best_fit
connection.url=jdbc:oracle:thin:@xx.xx.xx.xx:1521:AB
snapshot.mode=initial

SINK:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
transforms.insertTS.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.formatTS.format=yyyy-MM-dd HH:mm:ss
connection.password=***********
tasks.max=6
topics=csa-Table
transforms.formatTS.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms=unwrap,insertTS
transforms.formatTS.field=KAFKA_DT_TS
decimal.handling.mode=double
delete.enabled=true
auto.evolve=false
connection.user=username
poll.interval.ms=10
transforms.unwrap.drop.tombstones=false
auto.create=true
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
connection.url=jdbc:sqlserver://xxxxx:1433;databaseName=xxxxx;
transforms.insertTS.timestamp.field=KAFKA_DT_TS
insert.mode=upsert
pk.mode=record_key
pk.fields=ID,NBRKEY
transforms.formatTS.target.type=string

Can you try to remove

insert.mode=upsert

I don’t have it.

I will proivide our JSON config soon

Thank you, please don’t forget to provide the JSON config as you said.

Hi,
I tried insert.mode=insert and also insert.mode=update but none captured deleted records in the target database.

Here is our config that perfectly works →

{
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“errors.log.include.messages”: “true”,
“connection.password”: “xxxxxxxxxx”,
“transforms”: “createTombstone”,
“max.retries”: “10000”,
“retry.backoff.ms”: “3000”,
“key.converter.auto.register.schemas”: “false”,
“auto.evolve”: “false”,
“transforms.createTombstone.type”: “cxxxxxxxxxxxxxx$Value”,
“value.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“errors.log.enable”: “true”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“errors.retry.timeout”: “-1”,
“topics”: “xxxxxxxx”,
“connection.attempts”: “100”,
“errors.retry.delay.max.ms”: “2000”,
“connection.backoff.ms”: “2000”,
“transforms.createTombstone.table.name”: “changelog”,
“value.converter.schema.registry.url”: “https://xxxxxxxx:8081”,
“delete.enabled”: “true”,
“connection.user”: “xxxxxxxx”,
“name”: “xxxxxxx-xxxxxxx-xxxxxx-CLEANCHANGELOGCONNECTOR-xxxxx”,
“value.converter.auto.register.schemas”: “false”,
“auto.create”: “false”,
“connection.url”: “jdbc:sqlserver://xxxx.xxxx.xxxx:63604;databaseName=xxxxx”,
“pk.mode”: “record_key”,
“pk.fields”: “id”
}

Please note the SMT createTombstone is to ensure that the value of the record is null.
Maybe it’s your problem → is your record’s value really null ?

Did you check de Worker’s log and see the SQL request ?

Hi,
Thanks for you help but please forgive me cos I’m new to kafka.
I can’t find the class for transforms.createTombstone.type. Should I have to install a jar file to be able to use this SMT?

Our SMT is only for our context & use-case.
This “transform” is useless in your case.

Please, provide Worker’s log. Maybe your DB user needs granting to DELETE table rows.
Worker’s log provides explanation.

I think your problem is that you have this SMT: transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey

That will take these fields: transforms.createKey.fields=ID,NBRKEY

And use them as your PK. This works fine for inserts and updates but when the sink connector does a delete it uses the tombstone record which consists of a message key and a null value. Because you are telling your connector to pull the ID and NBRKEY from the message.value which is null for tombstone records, it cannot find the row to delete. I believe your choices are to 1) change your topic so ID and NBRKEY are the kafka message key or don’t enable deletes, use another transform to bring the op or deleted fields from the message header into the topic value and either have your SQL ignore those rows or delete them after the fact.

Also, if you go with the second option you also want to put in a transform to ignore tombstone records

Thanks all.
I think it makes sense what you have just described. I am new to kafka and teaching myself.
I used the createKey transform because my source table doesn’t have a primary key field so I had to create it for insert and update.
I will trying using the message key to see if it works.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.