Prevent Closing State Stores Dirty in Kafka Streams

My application keeps a large amount of data (a few hundred gigabytes) on state stores to join data together from our different data sources before streaming out to another database. Because we have a ton of state, the time taken to restore our KTables from the changelog topics results in significant downtime. We also use the exactly once V2 processing guarantee, so closing the state stores dirty requires restreaming all state from the changelog topics. We would prefer to avoid standby replicas due to the associated costs.

From what I can tell, there is no way to always shutdown cleanly, but we would like to shutdown dirty as infrequently as possible. So far we have implemented it with the JVM shutdown hook, state store listener, and uncaught exception handler, but the JVM shutdown hook is the only reliable way to shutdown cleanly. Everything else is flaky and often gets caught in deadlocks. Note that we are initiating this clean shutdown from calling streams.close().

This seems like a common problem that would be faced, but I could not find any information about achieving this functionality in documentation or online sources. Any advice or recommendations would be greatly appreciated.

Kafka streams version: 3.6.2

there is no way to always shutdown cleanly

Can you elaborate? Of course, an error can happen at any time, but there is no fundamental barrier to “always shutdown cleanly” as long as no error happpens.

So far we have implemented it with the JVM shutdown hook, state store listener, and uncaught exception handler, but the JVM shutdown hook is the only reliable way to shutdown cleanly.

Well, yes… “state store listener” is well, as listener to inform you about restore progress. And “uncaught exception handler” inform you about an error after it happened, and allows you to restart the thread, or shutdown the client or app. But as it’s after an error already occurred, some task got already closed dirty before the handler is called.

Everything else is flaky and often gets caught in deadlocks. Note that we are initiating this clean shutdown from calling streams.close().

Because it’s not designed to call streams.close()… For the “uncaught exception handler” an can return “shutdown client” or “shutdown app”, but as I said, some tasks got already closed dirty… And for the “state store listener”, I don’t understand why you would want to call streams.close() to begin with?

This seems like a common problem that would be faced

Can you elaborate little bit more, what the problem is? Closing clean means, there was no error, and you want to stop an instance (base on some external signal), and you can use the JVM shutdown hook for it. And it seems that is working for you as expected? – After an error happened, closing clean is not possible as Kafka Streams need to react to the error and cleanup…

Maybe I misunderstand what you are asking about.

Hi there,

Thanks for getting back to me on this. Sorry had a mistake in my original post. I meant we were using the state listener (via streams.setStateListener), not the state restore listener.

The functionality that my team is looking for is for the Kafka Streams app to be able to reuse its state as much as possible when it restarts. So for “normal” restart scenarios (e.g. deployments), this works great. Where we start to see issues is when we have errors. Any error causes every state store to shutdown dirty. Is this just the intended functionality? It sounds like our approach of trying to preserve our state by calling streams.close() is not recommended or supported.

From what I understand, the fundamental problem with this approach is that we have already begun the shutdown process and some of the tasks are not healthy. I would have expected there to be support for rolling back the transaction in the state stores since this is what is done for the messages to the broker, but as far as I can tell, this is not the case.

So I probably need to rephrase my question to: what can we do to make state restoration from changelog topics as infrequent as possible? We can definitely ensure that errors thrown from our code are caught and attempt to shut down the application at that point (not sure if that’s supported or not. Will test shortly). Is there anything we can do to preserve our state in the case that errors are thrown from internal Kafka Streams code? I could see shutting down only the problematic task dirty while shutting down the unaffected tasks cleanly helping us a lot, but I have not seen any documentation suggesting that this is a real feature.

The “common problem” I was referencing in my original post is having large state stores blown away when any error arises. Unless we are seriously abusing Kafka Streams by holding too much state, I would expect other teams to have this problem. Perhaps they just solve it by leveraging standby replicas for task failover?

There is not much you do unfortunately. It’s a know issue that state store cannot “roll back” right now, and we only have a crude recovery mechanism via wiping out the store and rebuild from the changelog.

The only positive news is: it’s WIP to fix it via:

I don’t know though when it will ship… maybe with 4.1 release, but that’s of course far in the future, and does not help you right now.

For the state-listener, it’s also called after the fact, and task already in dirty state, and thus you cannot “safe” them any longer… Catching exceptions in your own code might help (btw: since AK 3.9, there is a new ProcessingExceptionHandler that could help?). Otherwise, it’s not really possible to do anything.

Thank you for explaining this. Glad to hear that there is some work inflight to address this issue. I’ll take a look at the ProcessingExceptionHandler for how it might be able to help with our application’s errors.