Materialised table pull queries with Tombstones

Hi,

We have run in to what I would consider some inconsistent behavior when running a pull query against a materialised table when the data has been tombstoned. I have put together a quick example that hopefully demonstrates the behavior.

If we have a tombstoned record in a topic that is backing a materialised table, when running a push query against the table the record is excluded, however when you run a pull query (full scan or by key) the previous record to the tombstone is returned. Tombstoning has effectively been done by following this blog

Returning the previous record feels counter intuitive, however I suspect this may be caused by the fact that the tombstone record is created without a schema and the table has a value format or avro? However it doesn’t really explain why it works for push queries and not pull queries. Is this a bug or is it behaving as intended?

example to replicate;

create stream `example_stream`(
    `key` string KEY,
    `value1` string,
    `isDeleted` boolean
) with (
    kafka_topic='example', 
    value_format='avro',
    partitions=1,
    replicas=1
);

create table `example_materialised_table` with (
    kafka_topic='example-materialised',
    value_format='avro',
    partitions=1,
    replicas=1) as
select `key`,
        latest_by_offset(`value1`, false) as `value1`,
        latest_by_offset(`isDeleted`, false) as `value2`
from `example_stream`
group by `key`
emit changes;

insert into `example_stream` values ('1', 'record1', false);
insert into `example_stream` values ('2', 'record2', false);
insert into `example_stream` values ('3', 'record3', false);
insert into `example_stream` values ('2', 'record2', true);
/*due to I suspect the simplistic nature of how I have set this example up, the "delete" event needs to be created before the tomstonening stream otherwise this leads to a race condition. Probably best solved TBH by adding a where isDeleted = False to the table definition*/
create stream `example_materialised_deletes_stream`
with (kafka_topic='example-materialised',
       value_format='kafka') as
select `key`, 
        cast(NULL as varchar) 
from `example_stream`
where `isDeleted` = True
emit changes;

/*push excludes tombstone*/
 select * from  `example_materialised_table` emit changes;

/*pull includes*/
set ksql.query.pull.table.scan.enabled=true
select * from  `example_materialised_table`;

select * from  `example_materialised_table` where `key` = '2';

Thanks,

Leander

Using latest_by_offset computes an aggregation on an input STREAM – thus, tombstone semantics don’t apply, because the input is a STREAM; tombstones only apply to input TABLES.

The “tombstone” cannot be processed by the aggregation and the record is dropped as malformed. Thus the result table example_materialised_table is not updated for this case (and nothing is deleted; as a matter of fact, an aggregation cannot semantically delete anything in the result table).

/*push excludes tombstone */
select * from example_materialised_table emit changes;

Not sure what you mean by this? You cannot write <key,null> via INSERT VALUES and thus your input example_stream won’t contain <key,null>. And if the filter evaluate to false, the input record is just dropped.

If you want to read a topic as a table to query it via pull queries, you should use the new CREATE SOURCE TABLE feature: https://www.confluent.io/blog/ksqldb-0-22-new-features-major-upgrades/

If you are using an older version, you could also do:

CREATE TABLE nonQueryable <schema> WITH(...);
CRAETE TABLE queryable AS SELECT * FROM nonQueryable;

For this case, the input is TABLE and thus tombstone semantics apply.

Hi,

Thanks for the response. So if I understand correctly what you are saying, the materialised table is not using the changelog to return the data? and it is not also not possible to delete a key from a materialised table?

The “deletion” stream outlined above is inserting a record onto the tables change log topic, as a “key” and null as the “value” object i.e. a tombstone.

From your description it sounds like the push query should then be returning an event for key 2, so it sounds like that part is potentially broken then?

I have attached two screenshots, one of the end state of the change log topic and one of the output of the different queries and you can see that the behaviour is different for the push and pull queries.

Ok, as are using trailing the confluent community cloud and therefor dont have access to KSQL 0.22 yet, it sounds like to get to a position where we could achieve something that would gives us an aggregated “now” view of the data that we are dealing with where we can utilise tombstones, we would need to stream the changelog from the materialised table, to a new topic where we can add the tombstones and then overlay a table and a queryable table?

the materialised table is not using the changelog to return the data?

It depends. For “pull queries” it does not use the changelog, but RocksDB is queried directly. For “push queries” the changelog topic would be read.

and it is not also not possible to delete a key from a materialised table?

Correct.

The “deletion” stream outlined above is inserting a record onto the tables change log topic, as a “key” and null as the “value” object i.e. a tombstone.

Well, those record are only written into the topic. But RocksDB state that is used by “pull queries” is not updated. (It’s actually an interesting “hack” you do here, to produce tombstones…)

For your queries:

select * from example_materialised_table emit changes;
It will read the topic. It seems key=2 was already compacted and thus it’s not showing up.

select * from `example_materialised_table;
Queries RocksDB and still contains key=2 because the tomstone is only written to the topic, but does not update RocksDB.

select * from `example_materialised_table where key=2;
Same as last query → uses RocksDB, not the topic.

Ok, as are using trailing the confluent community cloud and therefor dont have access to KSQL 0.22 yet

Not sure if I can follow? 0.22 was release in Confluent Cloud as as standalone release.

where we could achieve something that would gives us an aggregated “now” view of the data that we are dealing with where we can utilise tombstones, we would need to stream the changelog from the materialised table, to a new topic where we can add the tombstones and then overlay a table and a queryable table?

Sounds about right.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.