Hey everyone. I’ve got a KStreams Scala application that is a pretty basic source transform sink topology. The transform operation can (and is expected to) fail <1% of the time. To handle this, we implement retry queue (essentially Dead Letter Queue) logic.
In the current architecture, we use state stores to store events that should be retried. The problem we were running into is that we were running the instances in ephemeral space (essentially k8s), so the state stores would not persist after restarts, and those events would be lost.
My fix was to use intermediate topics as retry queues instead, but i’m running into the issue of seeing constant rebalancing based on the max.poll.interval setting. For example, if this is set to 2mins, it will be running for 2 mins, then will rebalance for 2 mins, cyclically.
Is there any advice to help fix this, or is my architecture of using these intermediate queues each with different ingestion delays doomed from the start haha.
Thanks for any tips, and let me know if i can provide any additional info that would be helpful.
Update: I think I solved this (still running tests) by setting the max.poll.interval higher than the highest delay in the exponential backoff. Since the delay is occurring with the consumer, it was timing out the poll call and triggering a rebalance each time
The problem we were running into is that we were running the instances in ephemeral space (essentially k8s), so the state stores would not persist after restarts, and those events would be lost.
This statement confuses me. Fault-tolerance has nothing to do with store persistence on local disk, but is achieved via change logging. So even if your local state is ephemeral, it should be recovered from the changelog topic. Or did you disable change logging on the store?
Hence, it should not be necessary to use an explicit DLQ topic to make this work.
About the rebalancing: it seems, your Processor is re-trying internally, thus “blocking” StreamThread and poll() cannot be called. So increasing max.poll.intererval.ms to a higher value does make sense. You might also need to tune other configs if you still run into issue (eg, consumer config max.poll.records, and maybe also Streams config buffered.records.per.partition).
Hm I wasn’t aware of how change logging worked, so that will be a good read. It was my impression that state stores would be persisted to a on-disk db instances like rocksdb.
I had a feeling that explicit DLQs that feed back into the input might not be the best way to go about this, especially since the actual input topic has such higher volume than the retry queue inputs.
It was my impression that state stores would be persisted to a on-disk db instances like rocksdb.
That is correct, but it has nothing to do with fault-tolerance. RocksDB is used for two reasons:
You can have state that is larger than main-memory
If you indeed have persistent volumns, you can speed-up state recovery, because it’s not necessary to read the full changelog.
I had a feeling that explicit DLQs that feed back into the input might not be the best way to go about this, especially since the actual input topic has such higher volume than the retry queue inputs.
This is for sure also possible and not wrong. My main point was to correct the understanding how state stores work.