How to minimize switchover time between Kafka Streams instances

I have an application with strict requirements to downtime. Currently I have an input topic with three partitions, and I am running three instances of the application to handle the data load. In a ideal distribution of the partitions, each instance will process data from its respective partition.

My problem occurs when one of the instances is shut down gracefully. When this happens, the other instances are too slow to take over the idle tasks resulting from the shut down instance. I have configured num.standby.replicas = 1, so one of the other instances should be warmed-up and ready to immediately take over the task. However, it seems to take around the default value for session.timeout.ms before the task is migrated to a running instance.

Is it possible to enforce the instance shutting down to leave the group during shutdown? And in that way, ensure that the running instances immediately take over the idle task?

What you observe is by design. Many release back, Kafka Streams changed how it uses the consumer, and it disabled the consumer feature to send a “leave group request” when the consumer is closed. Thus, in your case, only after session.timeout.ms triggers, a rebalance happens.

The goal was to avoid two rebalance when an instance is bounces. Back in the day, there was no incremental rebalancing yet, and the task assignment was not done as well as we do it today. I am personally not 100% sure if this behavior does still make sense of if we should change it back to normal…

To achieve this, the consumer has an internal config internal.leave.group.on.close that Kafka Streams sets to false – it should be possible to overwrite this config to true.

In our case, the “extra” rebalance when an instance bounces is completely fine as rebalancing usually only takes a few milliseconds. Hence, the internal config sounds like a viable option.

Just to clarify, the Stream Properties would look something like this?

import org.apache.kafka.streams.StreamsConfig

Properties().apply {
        put(StreamsConfig.consumerPrefix("internal.leave.group.on.close"), true)
       // More config below...
    }

Yes, looks correct to me.

@hermanjakobsen a more “stable” way of doing it might be the following:

streams.close(new CloseOptions().leaveGroup(true));

Edit: This is not true, there is actually a bug. Matthias is correct, and you have to use the internal flag. Please see: [KAFKA-16514] Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag. - ASF JIRA