We have run in to what I would consider some inconsistent behavior when running a pull query against a materialised table when the data has been tombstoned. I have put together a quick example that hopefully demonstrates the behavior.
If we have a tombstoned record in a topic that is backing a materialised table, when running a push query against the table the record is excluded, however when you run a pull query (full scan or by key) the previous record to the tombstone is returned. Tombstoning has effectively been done by following this blog
Returning the previous record feels counter intuitive, however I suspect this may be caused by the fact that the tombstone record is created without a schema and the table has a value format or avro? However it doesn’t really explain why it works for push queries and not pull queries. Is this a bug or is it behaving as intended?
example to replicate;
create stream `example_stream`( `key` string KEY, `value1` string, `isDeleted` boolean ) with ( kafka_topic='example', value_format='avro', partitions=1, replicas=1 ); create table `example_materialised_table` with ( kafka_topic='example-materialised', value_format='avro', partitions=1, replicas=1) as select `key`, latest_by_offset(`value1`, false) as `value1`, latest_by_offset(`isDeleted`, false) as `value2` from `example_stream` group by `key` emit changes; insert into `example_stream` values ('1', 'record1', false); insert into `example_stream` values ('2', 'record2', false); insert into `example_stream` values ('3', 'record3', false); insert into `example_stream` values ('2', 'record2', true); /*due to I suspect the simplistic nature of how I have set this example up, the "delete" event needs to be created before the tomstonening stream otherwise this leads to a race condition. Probably best solved TBH by adding a where isDeleted = False to the table definition*/ create stream `example_materialised_deletes_stream` with (kafka_topic='example-materialised', value_format='kafka') as select `key`, cast(NULL as varchar) from `example_stream` where `isDeleted` = True emit changes; /*push excludes tombstone*/ select * from `example_materialised_table` emit changes; /*pull includes*/ set ksql.query.pull.table.scan.enabled=true select * from `example_materialised_table`; select * from `example_materialised_table` where `key` = '2';