Does a tombstone in toTable() propagate to downstream groupBy/aggregate if the key never existed?

Hi,
I have a questions about the behavior of tombstones in Kafka Streams topology when using toTable() followed by groupBy() and aggregate().
Simplified code looks like this:

 <Some 1 partition KStream>
.process(MyProcessor::new)
.toTable(...) // with KeyValueStore
.groupBy((k, v) -> new KeyValue<>(value.a(), value)
.aggregate(...)

The situation:
In MyProcessor, I forward tombstones for keys that may have never existed in the downstream toTable() store. This is done “just to be sure” that any potential stale data is cleaned up.

My questions:

  1. When toTable() receives a tombstone for a key that never existed in its state store (i.e., store.get(key) returns null), does it forward this tombstone downstream to the groupBy() operator?
  2. If it does propagate, does the aggregate() have to do any meaningful work?
  3. From a performance perspective, if ~33% of my events are such “redundant” tombstones (for non-existent keys), what is the actual cost? We recently experienced a lag of over 90 million events on the internal repartition topic (the one created before toTable(), named like KSTREAM-TOTABLE-0000000XXX-repartition). It took about 1 day to process. The source KStream for this repartition topic has only 1 partition.

Thanks in advance for any clarification!

In general toTable would forward the tombstone blindly (there is some exceptions when you are using versioned state stores, but versioned stores are not the default, so I guess it doesn’t apply to your case).

Kafka Streams follows an “emit on update” policy, ie, even if a row does not change (could also be an idempotent put(key, myValue) with updates myValue to myValue), the update will be forwarded. (At some point there was a proposals to change to “emit on change” policy, but this policy caused issues and it was never implemented.)

The aggregation would still process the update: this implies to read the current aggregation result and an idempotent put to write it back (as the aggregation result didn’t change), plus forwarding the result record again (following emit on update policy again).

How much overhead this implies is hard to say. Depends on many tuning parameters (eg cache size, RocksDB, you name it), and also how you deploy it (ie, what hardware)… If my math is right, assuming 24h processing time for 90M records, it would be roughly 1K rec/sec. This does sound a little bit low – KS should be able to do in the range of 10K-50K rec/sec (on a single partition) depending on the operation.

So you might want to dig into state store performance, as your workload is most likely I/O bound.

Thanks for the detailed explanation! I have a follow-up regarding the “emit on update” policy in the context of a groupBy operation.

What happens if a tombstone arrives for a key that has never existed in the state store? Since a KTable#groupBy needs to extract a grouping key from the message content. Does the message still somehow reach the aggregate step?

It’s complicated… Maybe check the code for yourself?

Bottom line is, the tombstone should be dropped on the floor and not get forwarded to the downstream aggreation.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.