Exactly once processing and Stateful Processors

I have put this question in the “Stream Processing” category, although I am not totally convinced this is the right place.

I am trying to understand how to archetect and build stateful services in the context of exactly once processing.

So far I understand how to perform exactly once processing using transactions in the context of stateless processors. I don’t understand how Kafka is intended to be used to perform exactly once processing when processes must maintain state.

There doesn’t appear to be much documentation on this online, probably because it is a difficult problem.

I have come up with two potential solutions which use Kafka, and a third solution which comes pretty close to providing exactly once guarantees but not completely.

Allow me to explain -

Consider an example problem of:

  • Reading data from an input topic
  • Performing some stateful transform on this data
  • Producing data to an output topic
  • The simple example of calculating a moving average is good enough. The moving average calculation must maintain some state (maybe: sum, total number of records) to calculate the moving average

Here are 3 possible approaches. I don’t know if any of these are along the lines of how Kafka expects me to build such a system. Keep in mind that transactions will be in operation for all of the following options.

  1. Create a state topic, and enable log compaction. Store the entire application state in a message and use the same key for every message. This will cause the topic to maintain only the most up to date state. In some applications which maintain large quantities of state this may become inefficient due to the large volume of data transfered to the state topic.

  2. Create a state topic, enable log compation. Only store the “bits” of the state which change. For example, if we had a process which contained a hash map, every time we added a key to the hash map, create a new message on the state topic, with the hash as the key. Every time we update a value in the hash map, again store the value in a message with the hash as the key. To delete an entry from the hash map we would need a special message type, again with the key being set to the hash to be deleted. This will be much more efficient in terms of not sending large messages to the state topic, however I tried to build a service like this and found it was extremely complicated to write and try and implement correctly. I believe this approach in general would be exceedingly hard to maintain without introducing bugs.

  3. Store the state somewhere else, not in Kafka. Possible places to store the state could be on disk or in MongoDB. In this case, because we are not using Kafka for all possible ways in which data can transfer, it is not possible to implement exactly once semantics. However we can get pretty close by updating the state in Mongo/on disk just before committing the Kafka transaction. This is expected to fail only in rare circumstances, such as the power going off in the critical code section between sending data to Mongo (etc) and comitting to Kafka. This design will probably be “good enough”.

My question is: How should I approach the problem of designing stateful services with Kafka?

There’s actually a 4th option which I didn’t include.

  1. Manually add a unique identifier to each message and add logic to processes for manual de-duplication. This might be necessary in cases where state is not entirely stored within Kafka, if the guarantees provided by option 3 are not sufficient.

Hello @kafkaesque ,

I believe Kafka streams and Ksqldb uses same strategy to store its state which is rocksdb on top of local filesystem for temporary state store while processing is on going. You can configure the directory using state.dir property for kafka streams.

By default processing guarantee is set to “at least once” but you can configure it to exactly once. As long as data records are processed within Kafka (Kafka topic src → processing using Kafka streams / ksqldb → Kafka topic dest), you can achieve exactly once processing guarantee for each record.

Also, your destination topics cleanup policy (delete / compact) depends on your operation. Simplest way to configure that is if the operation is stateless, configure cleanup policy of your resultant (destination) topic of that operation to delete and if stateful then compact. It would work for most of the use cases if not all.

Kafka streams and Ksqldb also uses internal kafka topics to persist the state of operation for stateful operations. You can find them in your internal topic list, starts with ‘_’ and I do not think there is need of getting any other data storage system for this purpose unless there is any other need from your business use case.

Thank you.

Hi @shubhamshirur1 and thanks for your reply.

I must admit I am a bit confused.

I too have read that Kafka Streams use a rocksdb instance to save their internal state to the local file system.

If this is true I don’t understand how it can be possible for a Kafka Stream process to guarantee exactly once transactions. This conflicts with the documentation elsewhere which states that operations external to Kafka (for example, writing to a database or to disk storage, sending emails etc) cannot be absolutely guaranteed as part of an exactly once transactional process.

If Kafka Streams saves internal state to disk, I haven’t fully understood why this doesn’t then break exactly once semantics.

However, in your final paragraph you have stated that Kafka Streams persists state to internal Kafka topics.

If this is the case, it would make sense that this enables exactly once processing - all state goes to and comes from a Kafka system. There is no data exchange to anything external like on-disk storage.

But that appears to conflict with your first paragraph, so the situation of what is actually happening is not totally clear to me.

For example,

If operations needs to store intermediate state while processing the data, those are stored in temporary quick read-write rocksdb but resultant states are stored in the topics.

As mentioned earlier, within kafka you can leverage exactly once processing guarantee. That means, if input data is coming from kafka topic and processing in kafka streams / ksqldb and putting output data to kafka topic.

But when it comes to data pipelining between different data systems and kafka there is no kafka configuration enabling exactly once semantics. You may have to choose at most once or at least once semantics based upon your use case.

This is my understanding.

I must be missing something.

  • If operations needs to store intermediate state while processing the data, those are stored in temporary quick read-write rocksdb
  • but resultant states are stored in the topics

Don’t these two statements conflict?

It isn’t clear to me exactly what you said here.

I have two possible guesses. Before I explain what those are let me just make two points about processes which connect to Kafka to help explain.

  • Stateless Processes: A process which consumes data from Kafka, then produces and commits in a single step. It does not need to store long term state, because whatever data was consumed is “used up” in its entirity in the produce and commit stage. Since producing and comitting the consumer offsets is done as part of a transaction, there is no possibility of data loss.
  • Stateful Processes: Unlike the above, some data may remain “inside” the process which was consumed but not yet fully produced. This means that we consume some “information” from Kafka, and part of that information is purged from the process when it produces new events to Kafka. However part of this information remains inside Kafka after the consume-produce-commit loop.

The question is about what to do about that state in the second example?

Back to what you said above: Do you mean to say that:

  • The remaining intermediate state is stored in RocksDB only
  • “but resultant states are stored in the topics” → this is just normal producing of events?

OR did you mean that:

  • The remaining state is stored in RocksDB immediatly and then at some later time it is asynchronously produced to Kafka?

Thanks

There are two things I am talking about here, those are not conflicting.

You are right about stateful operations.

When you are performing stateful operations, while processing is on going, it does need to store intermediate states (in order to calculate final state - for ex. any windowed aggregation), those states are intermediate states and are stored in RocksDB. You can not query data from Kafka topics on adhoc basis, so that we can not leverage it as state store. But intermediate states as immediate results are produced onto Kafka topics as well so that Kafka clients are able to consume the data.

Coming to your question, For example in case Kafka streams microservice / Ksqldb cluster goes down. It is expected to start from where it has left whenever it comes up. In this case, what happens to the immediate states in rocksdb? Those are flushed to filesystem and reloaded back once Ksqldb / Kafka streams comes up online.

This is my understanding on state store lifecycle.

Ok thank you, that’s very helpful.

Presumably everything you have just described is specific to Kafka Streams?

So if I were to write a process myself using regular producers and consumers, how would I integrate that with RocksDB?

Can I access an on-disk RocksDB store as part of the Kafka API?

Finally just one point to make:

I believe this method (using RocksDB + sending state asynchronously) gives exactly once processing in all but the mose unlikely of failure combinations.

For example: If you have commit your state to RocksDB and then commit your state to Kafka there is a small window of time where if a failure were to occur, then you will lose data. The same is true if you commit to Kafka first and then commit to RocksDB. (Actually probably this will result in duplicates, rather than lost data.)

Is my thinking correct?