Exactly once processing and Stateful Processors

I have put this question in the “Stream Processing” category, although I am not totally convinced this is the right place.

I am trying to understand how to archetect and build stateful services in the context of exactly once processing.

So far I understand how to perform exactly once processing using transactions in the context of stateless processors. I don’t understand how Kafka is intended to be used to perform exactly once processing when processes must maintain state.

There doesn’t appear to be much documentation on this online, probably because it is a difficult problem.

I have come up with two potential solutions which use Kafka, and a third solution which comes pretty close to providing exactly once guarantees but not completely.

Allow me to explain -

Consider an example problem of:

  • Reading data from an input topic
  • Performing some stateful transform on this data
  • Producing data to an output topic
  • The simple example of calculating a moving average is good enough. The moving average calculation must maintain some state (maybe: sum, total number of records) to calculate the moving average

Here are 3 possible approaches. I don’t know if any of these are along the lines of how Kafka expects me to build such a system. Keep in mind that transactions will be in operation for all of the following options.

  1. Create a state topic, and enable log compaction. Store the entire application state in a message and use the same key for every message. This will cause the topic to maintain only the most up to date state. In some applications which maintain large quantities of state this may become inefficient due to the large volume of data transfered to the state topic.

  2. Create a state topic, enable log compation. Only store the “bits” of the state which change. For example, if we had a process which contained a hash map, every time we added a key to the hash map, create a new message on the state topic, with the hash as the key. Every time we update a value in the hash map, again store the value in a message with the hash as the key. To delete an entry from the hash map we would need a special message type, again with the key being set to the hash to be deleted. This will be much more efficient in terms of not sending large messages to the state topic, however I tried to build a service like this and found it was extremely complicated to write and try and implement correctly. I believe this approach in general would be exceedingly hard to maintain without introducing bugs.

  3. Store the state somewhere else, not in Kafka. Possible places to store the state could be on disk or in MongoDB. In this case, because we are not using Kafka for all possible ways in which data can transfer, it is not possible to implement exactly once semantics. However we can get pretty close by updating the state in Mongo/on disk just before committing the Kafka transaction. This is expected to fail only in rare circumstances, such as the power going off in the critical code section between sending data to Mongo (etc) and comitting to Kafka. This design will probably be “good enough”.

My question is: How should I approach the problem of designing stateful services with Kafka?

There’s actually a 4th option which I didn’t include.

  1. Manually add a unique identifier to each message and add logic to processes for manual de-duplication. This might be necessary in cases where state is not entirely stored within Kafka, if the guarantees provided by option 3 are not sufficient.

Hello @kafkaesque ,

I believe Kafka streams and Ksqldb uses same strategy to store its state which is rocksdb on top of local filesystem for temporary state store while processing is on going. You can configure the directory using state.dir property for kafka streams.

By default processing guarantee is set to “at least once” but you can configure it to exactly once. As long as data records are processed within Kafka (Kafka topic src → processing using Kafka streams / ksqldb → Kafka topic dest), you can achieve exactly once processing guarantee for each record.

Also, your destination topics cleanup policy (delete / compact) depends on your operation. Simplest way to configure that is if the operation is stateless, configure cleanup policy of your resultant (destination) topic of that operation to delete and if stateful then compact. It would work for most of the use cases if not all.

Kafka streams and Ksqldb also uses internal kafka topics to persist the state of operation for stateful operations. You can find them in your internal topic list, starts with ‘_’ and I do not think there is need of getting any other data storage system for this purpose unless there is any other need from your business use case.

Thank you.

Hi @shubhamshirur1 and thanks for your reply.

I must admit I am a bit confused.

I too have read that Kafka Streams use a rocksdb instance to save their internal state to the local file system.

If this is true I don’t understand how it can be possible for a Kafka Stream process to guarantee exactly once transactions. This conflicts with the documentation elsewhere which states that operations external to Kafka (for example, writing to a database or to disk storage, sending emails etc) cannot be absolutely guaranteed as part of an exactly once transactional process.

If Kafka Streams saves internal state to disk, I haven’t fully understood why this doesn’t then break exactly once semantics.

However, in your final paragraph you have stated that Kafka Streams persists state to internal Kafka topics.

If this is the case, it would make sense that this enables exactly once processing - all state goes to and comes from a Kafka system. There is no data exchange to anything external like on-disk storage.

But that appears to conflict with your first paragraph, so the situation of what is actually happening is not totally clear to me.

For example,

If operations needs to store intermediate state while processing the data, those are stored in temporary quick read-write rocksdb but resultant states are stored in the topics.

As mentioned earlier, within kafka you can leverage exactly once processing guarantee. That means, if input data is coming from kafka topic and processing in kafka streams / ksqldb and putting output data to kafka topic.

But when it comes to data pipelining between different data systems and kafka there is no kafka configuration enabling exactly once semantics. You may have to choose at most once or at least once semantics based upon your use case.

This is my understanding.

I must be missing something.

  • If operations needs to store intermediate state while processing the data, those are stored in temporary quick read-write rocksdb
  • but resultant states are stored in the topics

Don’t these two statements conflict?

It isn’t clear to me exactly what you said here.

I have two possible guesses. Before I explain what those are let me just make two points about processes which connect to Kafka to help explain.

  • Stateless Processes: A process which consumes data from Kafka, then produces and commits in a single step. It does not need to store long term state, because whatever data was consumed is “used up” in its entirity in the produce and commit stage. Since producing and comitting the consumer offsets is done as part of a transaction, there is no possibility of data loss.
  • Stateful Processes: Unlike the above, some data may remain “inside” the process which was consumed but not yet fully produced. This means that we consume some “information” from Kafka, and part of that information is purged from the process when it produces new events to Kafka. However part of this information remains inside Kafka after the consume-produce-commit loop.

The question is about what to do about that state in the second example?

Back to what you said above: Do you mean to say that:

  • The remaining intermediate state is stored in RocksDB only
  • “but resultant states are stored in the topics” → this is just normal producing of events?

OR did you mean that:

  • The remaining state is stored in RocksDB immediatly and then at some later time it is asynchronously produced to Kafka?

Thanks

There are two things I am talking about here, those are not conflicting.

You are right about stateful operations.

When you are performing stateful operations, while processing is on going, it does need to store intermediate states (in order to calculate final state - for ex. any windowed aggregation), those states are intermediate states and are stored in RocksDB. You can not query data from Kafka topics on adhoc basis, so that we can not leverage it as state store. But intermediate states as immediate results are produced onto Kafka topics as well so that Kafka clients are able to consume the data.

Coming to your question, For example in case Kafka streams microservice / Ksqldb cluster goes down. It is expected to start from where it has left whenever it comes up. In this case, what happens to the immediate states in rocksdb? Those are flushed to filesystem and reloaded back once Ksqldb / Kafka streams comes up online.

This is my understanding on state store lifecycle.

Ok thank you, that’s very helpful.

Presumably everything you have just described is specific to Kafka Streams?

So if I were to write a process myself using regular producers and consumers, how would I integrate that with RocksDB?

Can I access an on-disk RocksDB store as part of the Kafka API?

Finally just one point to make:

I believe this method (using RocksDB + sending state asynchronously) gives exactly once processing in all but the mose unlikely of failure combinations.

For example: If you have commit your state to RocksDB and then commit your state to Kafka there is a small window of time where if a failure were to occur, then you will lose data. The same is true if you commit to Kafka first and then commit to RocksDB. (Actually probably this will result in duplicates, rather than lost data.)

Is my thinking correct?

@kafkaesque

Hi, @otaylor asked me to pop in and help you out. I’m going to work backwards from your questions.

For example: If you have commit your state to RocksDB and then commit your state to Kafka there is a small window of time where if a failure were to occur, then you will lose data.

In Kafka Streams, the order of operations is as follows:

  1. Consume records from the input streams in a small batch
  2. Process them all, update the state - including updating the RocksDB state!
  3. Finally, In a single transaction, update all **input topic offsets, write messages to the rocksDB changelog topics, and write the messages to the output topics **. The transactional write is the essential part for “exactly once” processing (note: It’s actually effectively once, as you’ll see shortly).
  4. Repeat from 1.

If the Kafka Streams instance fails and loses all of its RocksDB state, upon restarting it will restore the data from the changelog topic(s). Note that no additional processing occurs until the changelog has been loaded into the RocksDB state. Once caught up, processing can resume as the rocksDB state and the consumer group input topic offsets are aligned.

Note that this is actually effectively once processing because side-effects, like calling a 3rd party REST API in the processing step, would be repeated when reprocessing a failure. However, note that the messages written to the output topic would only occur if step 3 above is a success - which makes it “exactly-once” for any consumers reading from that output topic. You’d want to ensure you have idempotent processing to avoid duplicating side effects.

So if I were to write a process myself using regular producers and consumers, how would I integrate that with RocksDB?

Hopefully the above gives you a better idea on that. The gist is that your internal state (rocksDB or otherwise) and your consumer group’s input offsets AND any output messages all need to be updated in a single transaction.

Alternately, you could choose to do something like Flink’s Checkpointing and Snapshots. This is an old blog (2018) but it describes pretty well how Flink uses two-phase commits to atomically update state, including stream offsets, while also writing to say a Kafka endpoint.

You can see the TwoPhaseCommitSinkFunction abstract class here.

So if I were to write a process myself using regular producers and consumers, how would I integrate that with RocksDB?

I think you’re probably less interested with RocksDB specifically, and more the process of exactly-once with state, using your own producer and consumer clients. The gist is that you either need to store all of your data in one spot (eg: Kafka Streams uses Kafka as the definitive store for everything, including backup state), or implement two-phase commits (eg: Flink internal state with Kafka Sinks). My two cents is to try to reuse something that already exists other than spinning your own, as there is a lot of thought and work that has gone into this problem. While I’m obviously biased since I work for Confluent, I’d recommend checking out Flink if you’re starting fresh today - I’ve done a lot of work with Kafka Streams in the past, but I like the Flink model of separating out the checkpointing and snapshots from Kafka.

Finally, with regards to your very first post:

Here are 3 possible approaches.

  1. Create a state topic, and enable log compaction. Store the entire application state in a message and use the same key for every message.
    You will likely run out of room in your message if you’re trying to maintain any meaningful state. However, this pattern would be fine if you use a primary key as the message key: eg, OrderId, AccountId, ShipmentId, etc.
  1. Create a state topic, enable log compation. Only store the “bits” of the state which change. … This will be much more efficient in terms of not sending large messages to the state topic, however I tried to build a service like this and found it was extremely complicated to write and try and implement correctly.

I’d strongly (very strongly) recommend against this pattern. It will easily get out of hand and be very difficult to reason about, evolve, and debug - as you already observed. One think I wanted to point out is that do not be afraid of sending “large messages” to the topic. In most cases 1 MB message size is in fact overkill for most state use cases, and in the event you need to send a very large message, you can always use the claim cheque pattern.

  1. Store the state somewhere else, not in Kafka.
    Yep, this is what Flink does. Two phase commits are required.
  1. Manually add a unique identifier to each message and add logic to processes for manual de-duplication. This might be necessary in cases where state is not entirely stored within Kafka, if the guarantees provided by option 3 are not sufficient.

If you don’t have 2-phase commit available, you could do this. You would need to give the consumers some sort of offset-based or stream-time based window to keep open for deduplication purposes (eg: keep the last 24h of stream time). However, note that you would still need to atomically update your internal derived state AND the input offset consumption progress, otherwise you do not have exactly-once processing - you just have deduplicated input.

Hope this helps

Adam

2 Likes

Hi,

Thank you for taking the time to respond with such a detailed message. It will take me some time to absorb this information.

I think I need to ask a couple of basic questions to try and get a complete understanding.

  1. Finally, In a single transaction, update all **input topic offsets, write messages to the rocksDB changelog topics, and write the messages to the output topics **. The transactional write is the essential part for “exactly once” processing (note: It’s actually effectively once, as you’ll see shortly).
  • If my understanding is correct, Kafka Streams chooses to use RocksDB. And “Vanilla” Kafka (not “Streams”) does not use RocksDB.
  • Previously people have said things to me like “Kafka is built on top of RocksDB”. But I suspect that this is a misunderstanding and that the true statement is more like “Kafka Streams use RocksDB to persist state from memory to disk”.
  • I would guess that RocksDB is used by Kafka Streams as a low latency data store. A way to persist state which exists in Memory to disk at disk latencies rather than “Kafka Transaction” latencies, which typically would be expected to be much higher.
  • I have found that comitting a transaction after processing each individual event leads to very low throughput. That appears to be because the transactional commit is quite a high latency operation, just in the same way as a producer flush is a high latency operation.

Thanks again for your help.

  • If my understanding is correct, Kafka Streams chooses to use RocksDB. And “Vanilla” Kafka (not “Streams”) does not use RocksDB.

Correct. But just think of RocksDB as a high-performance cache that spills to disk, and that can restore from disk if the files are available. You could replace RocksDB with an InMemory KeyValue store, or anything else that implements the KeyValueStore.

  • Previously people have said things to me like “Kafka is built on top of RocksDB”. But I suspect that this is a misunderstanding and that the true statement is more like “Kafka Streams use RocksDB to persist state from memory to disk”.

Yep, that would be a misunderstanding on someone’s part. You indeed have it correct - Kafka Streams uses RocksDB to persist state to disk, but with the goal of providing low-latency, high-performance key-based reads, upserts, and deletes.

  • I would guess that RocksDB is used by Kafka Streams as a low latency data store. A way to persist state which exists in Memory to disk at disk latencies rather than “Kafka Transaction” latencies, which typically would be expected to be much higher.

Yep exactly. Kafka Streams relies on fast in-memory IO (and SSD IO), say via RocksDB, to do performant joins, aggregations, lookups, etc. Latency is in the micro-second range.

Kafka itself (the topics, broker, etc) delivers the data to consumers in a relatively low-latency manner, though in the milli-second range.

  • I have found that comitting a transaction after processing each individual event leads to very low throughput. That appears to be because the transactional commit is quite a high latency operation, just in the same way as a producer flush is a high latency operation.

Yep, you got it. You’ll get the best throughput on Kafka using batching, as there is always some network and broker overhead that you’ll incur, be it per batch or per message. Same with transactions. So your best bet is to batch up your transactions the same way you batch up your consumer and producer batches, and you’ll get much better throughput. It’s all tradeoffs really.

It does sound like you’re on the right track in terms of figuring out the ins and outs. I would just say don’t get too caught up on RocksDB, just think of it as a high performance KV store that writes its data to the SSD. Kafka Streams works well with it because it can repopulate itself very quickly from the State Changelog topic such as when scaling instance counts up or down. If the data is already persisted to the disk (eg, the instance crashed due to power failure, then came back up), then you can restore your RocksDB based state almost instantly.

But to simplify the mental model, just think of it as a high performance key-value store, that you could swap out if you so chose (for eg, I ran some Kafka Streams applications back in the day that used DynamoDB as the Key-Value store, instead of RocksDB - just implemented the KeyValueStore interface I mentioned earlier).

1 Like

Hi and thanks again for your comprehensive reply.

I think I’m close to understanding everything, I just have a few more questions.

  • I am pretty sure I know the answer to this question but just to check I am not misunderstanding anything: Is the changelog topic stored on a Kafka topic? I’m fairly confident the answer is yes.

That leads me to ask:

  • Is RocksDB also used because it provides a convenient way to produce data for the changelog topic?
  • Presumably it also has a convenient way of consuming data from the changelog topic to restore its state if necessary too?

I think what I’m not currently understanding is how the transaction commit to Kafka is synchronized with the transaction commit to RocksDB.

Since these are two different systems, I thought that it was not possible to commit their transactional state simultaniously, as would be required for exactly once processing?

So is it the case that we can get almost exactly once processing but not exactly in an absolute sense?

In other words, many systems would try to place the two lines of code committing to each system as close to each other as possible. Is that what Kafka Streams does?

In other words:

  1. Setup all state to be comitted to both RocksDB and Kafka
  2. Commit to RocksDB
  3. Commit to Kafka

Thanks again for your help, I very much appreciate it.

Just had another thought, after thinking about this for a bit more:

  • If I want to implement exactly once transactional behaviour (in an absolute sense) between Kafka and some other system then do I need to engineer my own custom two-phase commit logic?
  • This “other system” could be RocksDB, or it could be something more simple like a file on disk.
  • This might include some kind of specialized message sent to Kafka to indicate pre-commit state for the whole system, followed by a commit message for the whole system.
  • These pre-commit and commit messages would be seperate to the pre-commit and commit messages which Kafka automatically inserts as part of its regular transaction operation.

Does my question make sense? I’m not sure how clear this is?

To take it to the extreme, we could perhaps imagine some system which is composed of Kafka, a SQL database, some files on disk, and MongoDB. (I’m just listing a range of common technologies here.)

If we wanted to have exactly once transactions in some processor which can read and write from these individual systems, can this be done by implementing some form of custom two-phase commit logic which leveraged the existing transactional behaviours of each of these systems?

Perhaps my question is a little too crazy…?

Is the changelog topic stored on a Kafka topic?

Yep! In a dedicated topic, with compaction and unlimited retention.

  • Is RocksDB also used because it provides a convenient way to produce data for the changelog topic?

    and

    I think what I’m not currently understanding is how the transaction commit to Kafka is synchronized with the transaction commit to RocksDB.

RocksDB doesn’t produce any data to the changelog topic. The changelog is updated independently of RocksDB in a Transaction, also containing the output messages, and input offsets. I believe that the exact order of operations is:

  1. Update RocksDB as you process the batch of input events
  2. Create output records (if any) and write them to the producer’s buffer.
  3. Once all the input records are processed, create a transaction for:
    a) The new input offsets
    b) Any output records
    c) RocksDB changelog records. (For this one I’m a bit fuzzy on the exact code - you’ll have to dig into it yourself. But keep in mind it’s at the state store level, and Kafka Streams orchestrates the entire process to ensure that no changelog records are missed).
  4. Commit the transaction.
    a) On success, consume the next batch of input records and process.
    b) On failure, retry until success, or final failure and program termination.

While this looks like it can cause the two to diverge in case of a failure, the Kafka Streams application does NOT do any more processing if it fails to write to RocksDB (lets say the disk is full). Basically, the only way that Kafka Streams event-driven processing resumes is if everything goes according to plan.

  • Presumably it also has a convenient way of consuming data from the changelog topic to restore its state if necessary too?

Changelog topic is only for restoring state. You can even choose to disable it if you like. You’d create the state store with Materialized#withLoggingDisabled().

So is it the case that we can get almost exactly once processing but not exactly in an absolute sense?

You can get exactly once processing from the perspective of a downstream consumer of the stateful application’s output records (3b above). They will only see the produced records once, regardless of how many times the upstream processor fails. What you wont get is “effectively once processing”, as a worst-case scenarion will see your application processing the same batch of records over and over and over again, failing at the end then restarting to process the same batch again. If it makes any calls outside of the system (eg: a REST call) then it may not be “exactly once” (say it charges you $1 per REST call).

In other words, many systems would try to place the two lines of code committing to each system as close to each other as possible. Is that what Kafka Streams does?

  1. Setup all state to be comitted to both RocksDB and Kafka
  2. Commit to RocksDB
  3. Commit to Kafka

Kinda - the thing here is that this process, in the case of ANY failure, only uses Kafka changelog (3) as the source of truth for rebuilding the RocksDB instance. RocksDB is effectively a cache for high performance KV pair operations. It is NOT the definitive truth, which is why changelogging is enabled by default. If you’re into databases, think of RocksDB as the row-based storage (b-tree) and Kafka changelog as the BinaryLog (or Write-Ahead-Log).

As for your next post:

  • If I want to implement exactly once transactional behaviour (in an absolute sense) between Kafka and some other system then do I need to engineer my own custom two-phase commit logic?

Check out the KIP for Kafka Two Phase commit. It’s in progress, but would deal with this scenario. I encourage you to not implement your own 2PC, but if you are interested, perhaps give these folks working on it a hand.

  • This “other system” could be RocksDB, or it could be something more simple like a file on disk.

If you’re writing to disk directly, you get atomic commits to your disk drive first. Then you could create an idempotent producer to write the info from the disk files to Kafka.

  • This might include some kind of specialized message sent to Kafka to indicate pre-commit state for the whole system, followed by a commit message for the whole system.
  • These pre-commit and commit messages would be seperate to the pre-commit and commit messages which Kafka automatically inserts as part of its regular transaction operation.

I’ll have to yield on these questions as I am no expert in implementing two-phase commits. However, aside from KIP-939, you could also check out this clever Kafka Sink Connector that uses two phase commits very similar to what you’re describing. You can find the coordinator here, but you may have to grok a bit through the code - they’re using Kafka Message passing to orchestrate the two phase commit.

To take it to the extreme, we could perhaps imagine some system which is composed of Kafka, a SQL database, some files on disk, and MongoDB. (I’m just listing a range of common technologies here.)

If we wanted to have exactly once transactions in some processor which can read and write from these individual systems, can this be done by implementing some form of custom two-phase commit logic which leveraged the existing transactional behaviours of each of these systems?

You could. Orchestrating distributed commits is usually difficult, expensive in terms of both resource usage and developer hours, and can hide some damaging corner cases that you need to very carefully guard against.

If you want my 2 cents, keep it as simple as you can for as long as you can.

I recommend that you try to figure out how to idempotently process your data first and foremost. If you need exactly once processing, then consider using the tools available (eg: Flink or Kafka Streams) first - you may find that you can build a sidecar and let Flink to the exactly-once bit, and then you handle everything else in the DB or engine of your choice (eg Mongo). If you find that you need more than they can offer, then perhaps see if KIP-939 suits your needs - you can then help contribute, or at least get up to speed on the challenges you would face in creating your own. Also, take a look at that Kafka Sink connector I sent you, as I know it works for distributed transactions as I have personally tried it.

Hope this helps :slight_smile:

1 Like

What would happen if the write/commit to RocksDB (and therefore disk) succeed but then the power goes out before committing to Kafka? I would guess that when the process starts up again it must restore its state from the Kafka changelog and not from RocksDB on disk store? I am thinking in this exceptionally unlikely but possible case, the Kafka transaction would not have been committed but the state to RocksDB would have been committed.

Perhaps my guess is wrong because that would seemingly make having the state saved in RocksDB a bit pointless - how would you know if you can perform the simple operation of reading state from RocksDB vs the more complex operation of restoring state from a Kafka topic containing a changelog?

… unless there’s a way to detect that a transaction commit to Kafka did not happen after a commit of the state to RocksDB?

By the way, if I wanted to look at some example code for Kafka Streams, can you make any suggestion of where I should look? I’m going to have a search around now, but I’m not sure I would know where to look.

That makes sense - and it’s a good point. I think this is subtly different from the above scenario I described relating to power failure at the worst possible time? It’s easy to see why the concept of “exactly once” is a hard problem - there’s a lot of moving parts and possible sequences of events.

To run through the “power failure scenario”:

  1. A transaction starts
  2. A batch of events are consumed, causing internal processor state to change
  3. Some data is produced, and events are dispatched via producers
  4. The commit offsets (for the consumed batch of data) are dispatched via a producer
  5. This changed internal, in memory, state is written to disk (RocksDB)
  6. There is one final thing to do: Commit the Kafka transaction. But the power fails here
  7. From the Kafka broker point of view, a producer died with an open transaction. It will time out and abort, I guess?
  8. The power comes back on and the processor restarts
  9. It loads its state from RocksDB
  10. The processor opens a new transaction, and consumes the same batch of events again
  11. This causes the internal state to be updated again - and some set of events have been consumed twice, causing the internal state to be changed twice

At least, this is what I think happens. Did I miss understand something?

  • If we were to restore the processor state from the Kafka changelog rather than from RocksDB, there would be no issue. But how do we choose which data store to restore state from?

Ah - you have just answered my previous question here. Now I understand. I will leave what I wrote in case someone wants to read this in future.

But this does raise a new question for me. What does Kafka Streams use RocksDB for? Is it just a convenient utility for performing Key-Value store operations? If so why would it need to persist state to disk? If it will rebuilt it state from a Kafka changelog, writing a RocksDB state to disk seems like an additional operation which isn’t needed?

Yes - thank you very much again, this is both immensely helpful and also of great personal interest to me. I don’t know if anyone made you aware but I am hoping to do a talk at a Kafka meetup fairly soon - on this subject area, because I think it is so interesting. (And my understanding of it was quite limited.)

I’m going to try and find some information from the resources you have directed me to now. It might take me a little while to read through some source code. I’ll take a look and see if I can find something useful.

What would happen if the write/commit to RocksDB (and therefore disk) succeed but then the power goes out before committing to Kafka? I would guess that when the process starts up again it must restore its state from the Kafka changelog and not from RocksDB on disk store?

Yes - When Kafka Streams starts up, it won’t make any forward progress until it has verified that RocksDB is aligned with the changelog. The RocksDBStore.java restores itself using the changelog, as you can see here.

You can check out this presentation (2020) that talks more about Kafka Streams and RocksDB. You may find it useful. Though a bit older, it’s still largely accurate in the major things. Performance Tuning RocksDB for Kafka Streams' State Stores - Confluent

By the way, if I wanted to look at some example code for Kafka Streams, can you make any suggestion of where I should look? I’m going to have a search around now, but I’m not sure I would know where to look.
The RocksDB code above will show you how it restores from the changelog.

Start at TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets. This is where the commit process starts, and with it, flushing State to disk. Each stream task will update independently. (see here)

Basically, maybeCheckpoint leads to AbstractTask.flush(), where we flush the state for the state manager. This is where we write the data to disk. (see here)

Note that near the end of this thread, we call task.postCommit(false); AFTER committing (see here). I believe that I was wrong then about flushing RocksDB before writing to Kafka. It appears that we do write to RocksDB during processing (but only the in mem portion), and then update Kafka in a transaction, and THEN flush RocksDB to disk. So it doesn’t seem like RocksDB could be anything other than lagging or in sync with the changelog.

With regards to committing, you’ll want to look at TaskExecutor.commitOffsetsOrTransaction. Note how there are two types of EOS (Exactly once semantics). I forget why we have both ALPHA and V2, but use V2 (see more details here).

Anyways, hopefully that tracks. I haven’t looked at the code for a while and it’s been refactored a bit, but the gist is that we use the changelog as the source, RocksDb as a cache, and everything updates in a transaction.

To run through the “power failure scenario”:

  1. A transaction starts
  2. A batch of events are consumed, causing internal processor state to change
  3. Some data is produced, and events are dispatched via producers
  4. The commit offsets (for the consumed batch of data) are dispatched via a producer
  5. This changed internal, in memory, state is written to disk (RocksDB)
  6. There is one final thing to do: Commit the Kafka transaction. But the power fails here
  7. From the Kafka broker point of view, a producer died with an open transaction. It will time out and abort, I guess?
  8. The power comes back on and the processor restarts
  9. It loads its state from RocksDB
  10. The processor opens a new transaction, and consumes the same batch of events again
  11. This causes the internal state to be updated again - and some set of events have been consumed twice, causing the internal state to be changed twice
    At least, this is what I think happens. Did I miss understand something?

Steps 3, 4, and 6 (commit the txn) are atomic. They either all happen or not at all. The first steps of the transaction are visible in the Kafka log, but they are not committed, and are not consumed by any consumers. If you create a new transaction without closing that old one, nothing happens - the uncommitted transaction remains in the log as that’s something that happened, but it’s not something that consumers can react or read to.

  1. This causes the internal state to be updated again - and some set of events have been consumed twice, causing the internal state to be changed twice

Correct - which is why I called it “effectively-once” a bit earlier. The internal state transitions will occur twice, (once before the failure, once after), so if you have any non-idempotent operations, you may have a problem.

What does Kafka Streams use RocksDB for? Is it just a convenient utility for performing Key-Value store operations? If so why would it need to persist state to disk? If it will rebuilt it state from a Kafka changelog, writing a RocksDB state to disk seems like an additional operation which isn’t needed?

RocksDB is high performance key-value access that can spill to disk if needed. If you’re 100% sure you’ll never OOM, then feel free to use an in-memory KV store. If you’re not 100% positive, then RocksDB is better as it provides acceptable performance and safeguards you against memory limits.

Yes - thank you very much again, this is both immensely helpful and also of great personal interest to me. I don’t know if anyone made you aware but I am hoping to do a talk at a Kafka meetup fairly soon - on this subject area, because I think it is so interesting. (And my understanding of it was quite limited.)

Glad I can help! I suggest you take a look at Bill Bijeck’s “Kafka Streams in Action” if you’re interested more on Kafka streams. He’s written extensively about it.

1 Like

Ok, great - I think I’m finally understanding.

I think this is a key point:

I previously described a scenario of writing the RocksDB state before comitting a Kafka transaction. But that perhaps doesn’t make much sense, because the order is the wrong way round.

Here’s my explanation, please let me know if my understanding is wrong here.

As you later commented:

If the internal state (whereever that may be stored, RocksDB or otherwise) is “committed” first, and the Kafka transaction is committed second, then we get what you describe as “effectively-once” transactions. In other words, if we are counting some number of events, the count will be wrong, because we will process a batch of data twice in the case of failure.

However, if the Kafka transaction is completed first, the situation is a bit different. (I think this is how it can work.)

  • We would normally expect to lose some data in this case rather than process it twice.
  • However, if we read the state from a Kafka changelog, then we can reconcile any differences between the “Kafka” state and the “internal” process state.
  • You already described this process in the context of Kafka Streams.

So this explains how to write an exactly once processor.

  • You have to update your internal state but not commit it
  • You have to produce all events to Kafka including consumer transactions, produced events and changelog data relating to the internal state
  • You must commit the transaction to Kafka
  • Finally, you must commit your internal state
  • This might include flushing to RocksDB or disk or some other system
  • When you restart, you must use the changelog to rebuild the internal (in memory) state and verify data in other systems

The exact implementation for some of this could vary a bit. For example, let’s ignore all external systems like filesystems, RocksDB, etc. The “commit to internal state” which must follow the commit to Kafka is a bit interesting. We would probably need a way to implement internal process state as some kind of double-buffered system, so that we have ways of performing a commit and rollback operation.

I think I can now explain how to write a stateful processor which intergrates correctly with Kafka transactions. It’s not easy, but can be made easier with some thought about the processors design!

I would recommend anyone reading this in future watch the first 10 minutes of the above linked presentation.

It’s exceptionally interesting and helpful in that it shows an archetecture for building a stateful application. (The fact RocksDB is in use here doesn’t matter, you can exchange that for something else or just ignore it.)

1 Like

@kafkaesque - I went to ask one of the Kafka PMCs for some backup. You ask very good questions. :slight_smile:

Here’s what you asked:

It seems that I was missing a part of the equation, and it wasn’t immediately clear to me how it worked (and I ran out of time to poke around in the code :slight_smile: ). So I figured I’d go ask an expert.

Their response:

So it looks like my explanation of how the RocksDB and Kafka flushes / commits and offsets worked was missing some details. Hopefully the PMC member’s description helps.

This seems correct to me. This is definitely the “unoptimized” use case though, since ideally we would prefer to reuse the RocksDB disk state to reduce network IO and reduce downtime. I think the PMC member’s explanation of using the offset/state sync file solves that (#3 above).

Excellent! I hope that you find success. I’d say poke through the code a bit more to get a better idea, keeping in mind what the PMC member mentioned. I think you’ll want to start here. kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java at 1ca939128521459ef921d3ec71dda2a507ba1f15 · apache/kafka · GitHub

1 Like

Thanks for these additional details, I just wanted to clarify what is ALOS?

EOS = Exactly Once Semantics but I’m not sure what ALOS? Almost Once Semantics possibly?

I’m slightly confused by the explanation about deleting the file, but I think I know enough to answer it myself.

Conceptually, I understand the “optimized” case. There is a file which indicates whether or not the state can be restored from the RocksDB on disk data.

This additional file is used to indicate that the RocksDB on disk state is synchronized with Kafka. Both systems (Kafka and RocksDB) support transactions.

These systems can become de-synchronized because one must commit its transactional state before the other. Which ordering is chosen doesn’t matter that much because this aformentioned additional file is used to indicate whether both commits succeeded or something happened causing the program to crash during this double commit phase.

I’m not sure which way round it is.

  • If the additional file exists, does that indicate consistent or inconsistent state?

It doesn’t matter that much, I suppose. Either could be a valid choice.

This seems to suggest that the file is created at the start of the “commit” phase and deleted afterwards. However the above two quotes seem to suggest that the file existing means that the “commit state” is consistent.

For any future readers, it works something like this:

The existence of an additional file is being used to indicate that the RocksDB state and Kafka state are consistent. Meaning that a transaction has completed, and both transactional states of RockDB and Kafka are synchronized. Because these are two different systems and there is no way to synchronously commit to both of them in an atomic way, you must commit to one and then commit to the other. This is why a flush operation is required as part of the transactional commit process.

Here’s the general recipie. This is actually the complete solution to my original questions, including an optimization to prevent the full changelog needing to be read on restart:

  1. Open a transaction to each system. That might be: One transaction for Kafka, one for RocksDB, one for something else like a database. If you want to interface to some “system” which does not support transactions, you will have to build a way to make its state transactional. (See note.) One of your systems might be some in memory state. You may have to make your in memory state function in a transactional way. (Or maybe not. It depends.)
  2. Consume and produce data to each system.
  3. When it is time to commit, create a new file on disk. This file indicates that the different systems may now be in an unsynchronized state. Let’s just consider a process interfacing to Kafka and RocksDB. If this file exists, it indicates that the RocksDB and Kafka transactions might be out of sync.
  4. Commit to either Kafka or RocksDB. Now we are out of sync.
  5. Commit to the other system.
  6. When the commits to all systems have finished, we are now back in sync.
  7. Delete the file to indicate all systems are in a synchronized state.

This describes a possible way to implement exactly once transactions across multiple systems with an optimization which prevents the state needing to be read from a Kafka topic changelog on startup.

Note: Making a non-transactional system transactional:

  • What is needed is a way to indicate that a transaction is taking place, and a way to perform an atomic update of the state.
  • This could be as simple as a “double buffer” implementation plus some way to indicate the start and end of a transaction.
  • Consider an example of an array in memory.
  • To start a transaction you make a copy of the array.
  • You perform operations to change the array state during the transaction.
  • To commit, you swap a reference to both arrays.
  • One of the arrays contains the live state, the other contains the state as of the last synchronization point. (Commit.)
  • For more complex systems you can imagine how you might use the same trick of using a file to indicate the system is in an “intermediate” state with an open transaction.

Thanks again for helping me to fully understand the situation here. Not only has it been fantastically interesting, but this will help me to write up something to present at the next Kafka conference near me. (That’s the plan anyway.)

This design is pretty cool and I appreciate you taking the time to help me understand it in detail. :slight_smile: