Hi
I’m using the Debezium SQLServer CDC source plugin and KSQL - it’s producing messages to a topic representing rows in a table.
From there, I set up some streams and tables:
SET 'auto.offset.reset' = 'earliest';
/* source collection */
CREATE STREAM s_emp_from_debezium (empno INT, ename VARCHAR, dept BIGINT) WITH (KAFKA_TOPIC='mssqllatest.dbo.emp', VALUE_FORMAT='JSON');
/* derived collection */
CREATE STREAM s_emp_repart WITH(KAFKA_TOPIC='employees_repart', VALUE_FORMAT='JSON', PARTITIONS=1) AS SELECT * FROM s_emp_from_debezium PARTITION BY empno;
/* create a non-queryable table off the repart topic */
CREATE TABLE t_emp (empno INT PRIMARY KEY, ename VARCHAR, dept BIGINT) WITH (KAFKA_TOPIC='employees_repart', VALUE_FORMAT='JSON');
/* create queryable (derived) table */
CREATE TABLE q_emp AS SELECT * FROM t_emp;
Then I query the table using ‘select * from q_emp’; It returns the rows of the table - great!
Over on My SQLServer I then update a row … and re-run the select in KSQL … the update shows up … great! I
Then try inserting a new row over on SQLServer … and that too shows up when I re-run the select … all great!
However, when I DELETE a row in SQLServer, it continues to show up in the KSQL query!
I also tried watching the table row count as I deleted rows of the table in SQLServer … no change:
/* push query to watch row count */
SELECT COUNT(*) FROM q_emp GROUP BY 1 EMIT CHANGES;
I have the CDC connector configured like this:
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.server.name": "mssqllatest",
"database.dbname": "demo",
"database.hostname": "192.168.0.236",
"database.port": "1433",
"database.user": "sa",
"database.password": "Toughpass1!",
"database.instance": "mssqllatest",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.mssqllatest",
"table.include.list": "dbo.emp",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones" : "false",
"transforms.unwrap.delete.handling.mode" : "none"
When I delete a row… a second message appears in the initial topic for that key, this time with ‘null’ for the value , so the CDC is doing what it is supposed to do… ie writing <K,NULL>
to the initial topic…
Looking in the derived stream’s topic, employees_repart
, I don’t see the deleted row getting propagated … aren’t derived collections supposed to ‘stay in sync’ with parent collections? This explains why the queries don’t look right of course.
So what am I missing here?
So far I have not been able to create a TABLE based on this incoming CDC stream that behaves like a table should … works great for INSERT/UPDATE but so far not DELETE…
Is there some other magic step/config I have missed?
Free beer for anyone that can show me a simple case of CDC working via a KSQL TABLE query for INSERT, UPDATE AND DELETE …
Thanks!