Should we stick with at-least-once or switch to exactly-once-v2?

Hello!
In our Kafka Streams application we have processors that use key-value state stores. Some of these processors are processors for event deduplication from input topics.

We check if a new event is already present in state store of this processor and:
if it’s present we skip this event, if not: we put this to our state store and forward this event.

If I understand correctly, with at-least-once processing guarantee there is a possibility that our application will persist event in state store but in some cases it won’t forward this event further. In this scenario we will lose this event and our data will be inconsistent.

I think exactly-once-v2 is a solution for that. But there is a problem: we use only 2 kafka brokers and there is requirement in docs that we should have at least 3 brokers for this processing guarantee.

Am I right about at-least-once and our processor? Would you recommend switching to exactly-once-v2 in our case (we have only 2 Kafka brokers :confused:)?

Thank you in advance for answers.

I think it does make sense to switch to exactly-once semantics.

If you only have 2 brokers, you (in general) have a difficult tradeoff between correctness and availability to make. In general, 3 brokers are recommended to be able to tolerate one broker going down, and still being able to replicate writes to two brokers to guard against data loss.

If you only have two brokers, you cannot get both, guard against data loss and provide availability, in case one broker goes down. If one broker goes down, you can either say, I guard against data loss by not accepting any new writes (as writes cannot be replicated). Or, you allow writes and stay available, but these writes won’t be replicated to a second broker and are thus not guarded against data loss.

By default, brokers are configures with replication factor of 3 and min.irs=2 for the internal transaction-state topic. You can change these configs to either replication=2 and min.irs=2 (guard against data loss, but no HA), or replication=2 and min.irs=1 (stay HA, but no guard against data loss), Pick your poison :slight_smile:

(I don’t know the concrete config names for the transaction-state-topic, but you can find them in the docs.)

1 Like

Thank you for the answer!

I discovered this topic KIP-892: Transactional Semantics for StateStores - Apache Kafka - Apache Software Foundation So if I understand correctly, there is a high possibility that our application would have to restore state stores from zero (with exactly_once_v2) when, for example, it was killed because of out of memory. I have huge state stores so it would take too much time :frowning:

Yes, that’s right… As long as KIP-892 is not done that can be a problem. Enabling standby tasks can help to avoid downtime for this case.

1 Like

I didn’t know there is something like standby replicas, thank you for the tip :slight_smile: Can I have additional questions about KIP-892? I am not sure if you are familiar with details.

Is it correct that enabling setting enable.transactional.statestores from KIP-892 will make StateStore.get (inside Processor) only return commited records? If result forwarded by my Processor depends on data saved (in State Stores) during processing previous events, could it forward incorrect result (because it might not get uncommited changes to RocksDB)? I am not sure if Interactive Queries mentioned in KIP-892 refers also to StateStore.get.

Edit:
I found this fragment:

When reading records from the StreamThread , we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query

Does it mean that StateStore.get inside Processor will get uncommited values (with enable.transactional.statestores=true)?

will make StateStore.get (inside Processor) only return commited records?

No. Inside the processor, you see uncommitted data. This is required and the Processor must see it’s own uncommitted writes.

Does it mean that StateStore.get inside Processor will get uncommited values (with enable.transactional.statestores=true)?

Yes.

Note that store are sharded, and thus each Processor has it’s own shard of the state, and thus, it cannot see uncommitted writes from parallel processors as it cannot access other shards to begin with.

For IQ it will be different though (and a behavior change), from what I remember. We would only serve committed data (right now, we serve uncommitted data via IQ). It was an open question is we should/can allow to also serve uncommitted data, and introduce some “IQ isolation level” user can pick from.