CDC delete row issue

Hi
I’m using the Debezium SQLServer CDC source plugin and KSQL - it’s producing messages to a topic representing rows in a table.
From there, I set up some streams and tables:

SET 'auto.offset.reset' = 'earliest';
/* source collection */
CREATE STREAM s_emp_from_debezium (empno INT, ename VARCHAR, dept BIGINT) WITH (KAFKA_TOPIC='mssqllatest.dbo.emp', VALUE_FORMAT='JSON');
/* derived collection */
CREATE STREAM s_emp_repart WITH(KAFKA_TOPIC='employees_repart', VALUE_FORMAT='JSON', PARTITIONS=1) AS SELECT * FROM s_emp_from_debezium PARTITION BY empno;
/* create a non-queryable table off the repart topic */
CREATE TABLE t_emp (empno INT PRIMARY KEY, ename VARCHAR, dept BIGINT) WITH (KAFKA_TOPIC='employees_repart', VALUE_FORMAT='JSON');
/* create queryable (derived) table */
CREATE TABLE q_emp AS SELECT * FROM t_emp;

Then I query the table using ‘select * from q_emp’; It returns the rows of the table - great!
Over on My SQLServer I then update a row … and re-run the select in KSQL … the update shows up … great! I
Then try inserting a new row over on SQLServer … and that too shows up when I re-run the select … all great!
However, when I DELETE a row in SQLServer, it continues to show up in the KSQL query!

I also tried watching the table row count as I deleted rows of the table in SQLServer … no change:

/* push query to watch row count */
SELECT COUNT(*) FROM q_emp GROUP BY 1 EMIT CHANGES;

I have the CDC connector configured like this:

    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.server.name": "mssqllatest",
    "database.dbname": "demo",
    "database.hostname": "192.168.0.236",
    "database.port": "1433",
    "database.user": "sa",
    "database.password": "Toughpass1!",
    "database.instance": "mssqllatest",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.mssqllatest",
    "table.include.list": "dbo.emp",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones" : "false",
    "transforms.unwrap.delete.handling.mode" : "none"

When I delete a row… a second message appears in the initial topic for that key, this time with ‘null’ for the value , so the CDC is doing what it is supposed to do… ie writing <K,NULL> to the initial topic…

Looking in the derived stream’s topic, employees_repart, I don’t see the deleted row getting propagated … aren’t derived collections supposed to ‘stay in sync’ with parent collections? This explains why the queries don’t look right of course.
So what am I missing here?
So far I have not been able to create a TABLE based on this incoming CDC stream that behaves like a table should … works great for INSERT/UPDATE but so far not DELETE…

Is there some other magic step/config I have missed?

Free beer for anyone that can show me a simple case of CDC working via a KSQL TABLE query for INSERT, UPDATE AND DELETE …

Thanks!

seems to be working like this (on Docker):

name = SqlServerCDCSrc
connector.class = io.debezium.connector.sqlserver.SqlServerConnector
database.server.name = mssqllatest
database.dbname = demo
database.hostname = 192.168.0.236
database.port = 1433
database.user = sa
database.password = Toughpass1!
database.instance = mssqllatest
database.history.kafka.bootstrap.servers = broker:29092
database.history.kafka.topic = dbhistory.mssqllatest
table.include.list = dbo.emp
key.converter = io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url = http://schema-registry:8081
value.converter = io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url = http://schema-registry:8081
transforms = unwrap
transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones= true
transforms.unwrap.delete.handling.mode= none

closed

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.