Kafka state stores and at-least-once processing guarantee

Question

Does Kafka Streams API provides any guarantees how/when state store changes are replicated to Kafka broker with at-least-once processing guarantee?

Background

Assumptions that I have:

  1. State store changelog records are treated as any other output record produced by the Kafka Streams application. This means that it is nondeterministic when state store changes are published in correspondence to other output records - it is purely up to producer how it buffers and publishes records to the broker.
  2. It is guaranteed that consumer offsets are commited only after publishing all output records (including state store changes). This does not depend on processing guarantee being used in Kafka Streams application, i.e., this applies to at-least-once processing guarantee as well.

Possible issue

Since output records publishing to Kafka broker is nondeterministic, it completely possible that only part of output records (resulting from single input record) will be published to broker before Kafka Streams application crash or stream tasks (partitions) rebalancing. I.e., it is possible:

  • Output record gets recorded on a broker without corresponding changelog record.
  • Changelog record gets recorded without corresponding output record.

Of course since Kafka guarantees at-least-once processing, the same input record will be picked up after restart/ rebalancing, but it is nondeterministic in what state the state store will be (because we don’t know if chagelog record has been published or not).

Scenario
Assume we have a sub-topology in Kafka streams application, which takes input record and deletes corresponding record in state store (e.g., by input record key). On top depending whether there was a record in state store or not (delete operation returns previous value) we produce “true” (record deleted) or “false” (no record found) output record. The situation that we may end up is:

  1. There is a record in state store.
  2. Input record comes in and we delete corresponding record in state store.
  3. State store change gets published into a broker and application crashes without publishing output record (“true”/ “false”).
  4. On application restart state store is rebuilt from changelog topic, but since it contains a delete event now state store doesn’t contain deleted record (that was removed while processing input record in 2nd step).
  5. Same input record gets reprocessed, since consumer offsets were not acknowledged (due to crash), but now corresponding record is not present in state store and “false” result is produced instead of “true”.

Please correct me if any of my assumptions are wrong. Even if they are all correct, I think such situation can be avoided by having different processing logic, e.g., ensuring that firstly we just publish the result (true/false) and deleting entry in state store only when result is published (in another sub-topology).

Yes :slight_smile: – But it also depends on your config. Kafka Streams will flush all pending writes into all topics (output/repartition/changelogs) before committing the corresponding offsets on the input topics. Hence, your producer and topics configs determine the exact guarantees you get for the flush() operation.

It is guaranteed that consumer offsets are commited only after publishing all output records (including state store changes)

Yes. See above.

  • Output record gets recorded on a broker without corresponding changelog record.
  • Changelog record gets recorded without corresponding output record.

In case of an error, both scenarios are possible.

Of course since Kafka guarantees at-least-once processing, the same input record will be picked up after restart/ rebalancing, but it is nondeterministic in what state the state store will be (because we don’t know if chagelog record has been published or not).

Correct.

Even if they are all correct, I think such situation can be avoided by having different processing logic, e.g., ensuring that firstly we just publish the result (true/false) and deleting entry in state store only when result is published (in another sub-topology).

Might be possible. But it seems easier to use exactly-once and avoid the hassle?

Thanks a lot for the answer!

Re exactly-once processing guarantee - we try to avoid it and rather make our processes idempotent, because of the issues with Kafka transactions that we faced previously. Partly this is discussed at KIP-664: Provide tooling to detect and abort hanging transactions - Apache Kafka - Apache Software Foundation.

Now on second thought, extracting delete to separate sub-topology might not work as well, since following is possible:

  1. Input record is processed and output (“true”/“false”) gets recorded into a broker, but consumer offsets commit fails.
  2. Application gets restarted and now there is a race condition between reprocessing of input record and delete of a record in another sub-topology (since output message has been produced). If delete will take place first, in result again we’ll have wrong output produced (actually two outputs: first saying “true”, second “false”). Maybe we should not delete anything ever at all… :slight_smile:

The issue with these corner cases is that at first they seem fairly improbable, but when there is a constant stream of records it is just a matter of time…

In either case Thank You a Lot! It is good to know what “at-least-once” actually imply.

1 Like