Does a tombstone in toTable() propagate to downstream groupBy/aggregate if the key never existed?

Hi,
I have a questions about the behavior of tombstones in Kafka Streams topology when using toTable() followed by groupBy() and aggregate().
Simplified code looks like this:

 <Some 1 partition KStream>
.process(MyProcessor::new)
.toTable(...) // with KeyValueStore
.groupBy((k, v) -> new KeyValue<>(value.a(), value)
.aggregate(...)

The situation:
In MyProcessor, I forward tombstones for keys that may have never existed in the downstream toTable() store. This is done “just to be sure” that any potential stale data is cleaned up.

My questions:

  1. When toTable() receives a tombstone for a key that never existed in its state store (i.e., store.get(key) returns null), does it forward this tombstone downstream to the groupBy() operator?
  2. If it does propagate, does the aggregate() have to do any meaningful work?
  3. From a performance perspective, if ~33% of my events are such “redundant” tombstones (for non-existent keys), what is the actual cost? We recently experienced a lag of over 90 million events on the internal repartition topic (the one created before toTable(), named like KSTREAM-TOTABLE-0000000XXX-repartition). It took about 1 day to process. The source KStream for this repartition topic has only 1 partition.

Thanks in advance for any clarification!

In general toTable would forward the tombstone blindly (there is some exceptions when you are using versioned state stores, but versioned stores are not the default, so I guess it doesn’t apply to your case).

Kafka Streams follows an “emit on update” policy, ie, even if a row does not change (could also be an idempotent put(key, myValue) with updates myValue to myValue), the update will be forwarded. (At some point there was a proposals to change to “emit on change” policy, but this policy caused issues and it was never implemented.)

The aggregation would still process the update: this implies to read the current aggregation result and an idempotent put to write it back (as the aggregation result didn’t change), plus forwarding the result record again (following emit on update policy again).

How much overhead this implies is hard to say. Depends on many tuning parameters (eg cache size, RocksDB, you name it), and also how you deploy it (ie, what hardware)… If my math is right, assuming 24h processing time for 90M records, it would be roughly 1K rec/sec. This does sound a little bit low – KS should be able to do in the range of 10K-50K rec/sec (on a single partition) depending on the operation.

So you might want to dig into state store performance, as your workload is most likely I/O bound.