Are changelog updates and topic updates batched together within a topology execution?

Imagine you have a simple topology, single input topic, one Key Value State Store utilizing RocksDB, single output topic.

Flow: Read from input topic, update KV Store with a count, write to output topic.

Imagine that this flow processes 40 messages very quickly and that these messages (40 to output) and (40 to changelog backing topic [assuming all unique keys] are being batched to be sent to the broker as a unit of work.

batch update: messages 1 - 40 → SUCCESS

This of course works as expected. If the application were to die, after this point, a new application instance would be able to continue because all data was in that successful batch commit.

Question: Is it ever possible that a batch of messages being committed would not contain all messages from a single topology execution? Recall, there are two commits required to successfully process the topology. One to state store changelog topic, one to output topic.

For example: Think about a particular message, let’s say it is message #35. Would it ever be possible to have the following batching occur?

batch update 1: messages 1 - 35 (changelog only) → SUCCESS

batch update 2: message 35 (output topic only) - message 40 → FAIL?

Why am I asking? There is a concern that State Store changelog updates might be committed independently of the output topic, if they are not always grouped in the same batch of commits to the broker. If they are split between batches, and the broker connection is lost, between those batches, then we would have an incorrect state when the “new” application instance is started.

We have looked and cannot see where batching of Topic and changeLog updates might occur in the code. I feel like we are missing something fundamental about Kafka. Is this handled somehow, behind the scenes?

Thanks for any help!

Writes to the changelog topic and writes to the output topic are not guaranteed to be done atomically. As a matter of fact, the leader replica of the changelog topic and the leader replica of the output topic might be on different brokers and thus the producer needs to connect to two brokers and send two batches.

However, it’s also not relevant for completeness, because writes are not really committed. Writes are just sent and acknowledged after replication happened (depending on your configs). But you cannot look at only the write path but also need to consider the consumer that is actually committing input topic offsets to record progress.

To ensure are writes are successful, Kafka Streams first flushed all pending writes, and only afterwards (i.e., after receiving all acks) commits offsets using the consumer. Thus, if an error occurs between both writes, input offsets won’t be committed either, and after fail over the new consumer would reprocess the input data. Of course, the provides at-least-once processing semantics.

If you want to get exactly-once semantics you can change config processing.guarantees="exaclty_once_v2": for this case, all interactions (i.e., writing in the changelog topic, writing into the output topic, and committing offsets) are done atomically using Kafka transactions.

2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.