Question
Does Kafka Streams API provides any guarantees how/when state store changes are replicated to Kafka broker with at-least-once processing guarantee?
Background
Assumptions that I have:
- State store changelog records are treated as any other output record produced by the Kafka Streams application. This means that it is nondeterministic when state store changes are published in correspondence to other output records - it is purely up to producer how it buffers and publishes records to the broker.
- It is guaranteed that consumer offsets are commited only after publishing all output records (including state store changes). This does not depend on processing guarantee being used in Kafka Streams application, i.e., this applies to at-least-once processing guarantee as well.
Possible issue
Since output records publishing to Kafka broker is nondeterministic, it completely possible that only part of output records (resulting from single input record) will be published to broker before Kafka Streams application crash or stream tasks (partitions) rebalancing. I.e., it is possible:
- Output record gets recorded on a broker without corresponding changelog record.
- Changelog record gets recorded without corresponding output record.
Of course since Kafka guarantees at-least-once processing, the same input record will be picked up after restart/ rebalancing, but it is nondeterministic in what state the state store will be (because we don’t know if chagelog record has been published or not).
Scenario
Assume we have a sub-topology in Kafka streams application, which takes input record and deletes corresponding record in state store (e.g., by input record key). On top depending whether there was a record in state store or not (delete operation returns previous value) we produce “true” (record deleted) or “false” (no record found) output record. The situation that we may end up is:
- There is a record in state store.
- Input record comes in and we delete corresponding record in state store.
- State store change gets published into a broker and application crashes without publishing output record (“true”/ “false”).
- On application restart state store is rebuilt from changelog topic, but since it contains a delete event now state store doesn’t contain deleted record (that was removed while processing input record in 2nd step).
- Same input record gets reprocessed, since consumer offsets were not acknowledged (due to crash), but now corresponding record is not present in state store and “false” result is produced instead of “true”.
Please correct me if any of my assumptions are wrong. Even if they are all correct, I think such situation can be avoided by having different processing logic, e.g., ensuring that firstly we just publish the result (true/false) and deleting entry in state store only when result is published (in another sub-topology).