Kafka Stream high availibility / low latency

Hello.

I’m trying to develop an application which is going to process lots of data and high availability/low latency are crucial requirements.

Let’s assume that a restart of a single instance takes about 10 seconds. This is way too much so I’m trying to set up all bits and pieces so that potential rebalancings take as little time as possible.

First, I’m not going to use group.instance.id since, as I understand, the purpose of this parameter is to avoid unnecessary rebalancing, and it’s not my case.

Second, I’m using: internal.leave.group.on.close, since I want my standby instance to take over tasks as soon as possible.

Third, I’m using num.standby.replicas=1 so if an instance dies, another instance is ready to take over their tasks

So, in my POC, I have a topic of 5 partitions and 5 Kafka-stream instances.
Once I started my application I got the following distribution:

Instance 1:
New active tasks: [0_3]
New standby tasks: [0_2]
Instance 2:
New active tasks: [0_0]
New standby tasks: [0_4]
Instance 3:
New active tasks: [0_2]
New standby tasks: [0_3]
Instance 4:
New active tasks: [0_1]
New standby tasks: [0_0]
Instance 5:
New active tasks: [0_4]
New standby tasks: [0_1]

Now, let’s shut down instance 5.
My assumption is that the task 0_4 will be taken over by Instance 2. Correct?

After Instance 5 died we see:

Instance 1:
New active tasks: [0_3]
New standby tasks: [0_2, 0_1]
Instance 2:
New active tasks: [*0_4*, 0_0]
New standby tasks: [0_3]
Instance 3:
New active tasks: [0_2]
New standby tasks: [0_3, 0_1]
Instance 4:
New active tasks: [0_1]
New standby tasks: [0_4, 0_0]

Worked as expected, also I didn’t observe any long processing pauses, 1 second is ok.

Now, let’s bring back Instance 5. Before it died it had had a state for P4 (active) and P1 (standby). Does this mean that if Instance 5 joins the group, it is most likely to assign partitions 1 and 4 again (since already some state for those partitions is present and only some catchup is required) ?
Or there’s no such logic?

I’ve turned on Instance 5 now, and it indeed received partitions 4 and 1. However, I’m not sure if it’s always the case.

Moreover, I see that this happens in two phases:
first tasks are assigned as standby:

New active tasks: []
New standby tasks: [0_1, 0_4]

and only after some time one of the tasks becomes active:

New active tasks: [0_4]
New standby tasks: [0_1]

Does this always work like that, meaning: first, an instance is catching up with the state and only if it’s ready will it receive an active task?

To sum up, it seems like while writing this question everything worked as expected. I would like to confirm if it always works like this. Is my understanding of the algorithm ok? Are there any hidden traps?

Is this article:

(also linked in your book: Mastering Kafka Streams and ksqlDB) obsolete?

Have you got any other important information which I should take into consideration?

Does this mean that if Instance 5 joins the group, it is most likely to assign partitions 1 and 4 again (since already some state for those partitions is present and only some catchup is required) ?
Or there’s no such logic?

Yes. The logic is based on what we discover in the local state directory. As we will find old state of both tasks 0_1 and 0_4, it’s take into account to potentially re-assign both as active and/or standby if possible, to avoid state movement (and expensive changelog recovery). It’s of course always a tradeoff between avoiding state movement and getting a balanced assignment, but in your example we kill two birds with one stone re-assigning both tasks to instance 5.

Does this always work like that, meaning: first, an instance is catching up with the state and only if it’s ready it’s going to receive an active task?

Yes. It’s a “warm up” feature. We have the intend to assign 0_4 as active, but to keep “offline” time small, we first let it “warm up” and replay the changelog, before we rebalance again and promote it as active task. You can control how “aggressively” we promote a standby/warm-up task as active via acceptable.recovery.lag config (the higher the value, the earlier / more aggressively we promote to active). Of course, after we promote as active, the task will always need to finish the catch up and read to the end of the changelog topic before processing can start. Thus, acceptable.recovery.lag controls how long a newly promoted active task needs to restore from the changelog before it can start processing new input data.

I would not call the article or book obsolete, but of course the article is 4 years old and does not cover latest improvements. Cf:

In general, if you want to understand the greatest and latest, you would need to checkout the release notes of every release, and read the corresponding KIPs.

1 Like

Thank you @ mjsax!
I really appreciate your support.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.