Kafka Streams Timeout During Partition Rebalance - Seeking Insights on NotLeaderOrFollowerException

Hi Kafka Community,

We are running a self-managed Kafka service hosted on AWS EKS Cluster, and recently encountered an issue that led to a Kafka Streams timeout, causing the streams to go into an error state. We’re hoping to get some insights into what might have caused this.

Context:

  • The timeout occurred during a partition rebalance, which took longer than the default timeout period. (1 min)

  • Prior to this, there were several rebalances (unrelated to the affected topic) occurring about half an hour earlier. These rebalances involved topics with no consumers, so there was no concern about timeouts on those.

  • During the incident, we observed a NotLeaderOrFollowerException which seems to have triggered the issue.

Observations:

  • The issue appears to have started with the NotLeaderOrFollowerException, which initiated a metadata update and subsequently triggered a rebalance.

  • This rebalance exceeded the default timeout, leading to the timeout error and the Kafka Streams going into an error state.

  • We also noticed disconnect exceptions around the same time.

  • There was no broker restart, and we are using Zookeeper for metadata management (Kafka version: 2.8.2). There were no errors observed in Zookeeper.

LOG

[ERROR] [metric-resource-client-executor-37] [i.d.j.e.IllegalStateExceptionMapper] Error handling a request: baddcb33eda4c876 java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
	at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:350)
	at org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1596)
	at metricprocessor.stream.MetricProcessorStreamManager.getMetadata(MetricProcessorStreamManager.java:91)
	at metricprocessor.stream.MetricProcessorStreamManager.findPartitionHost(MetricProcessorStreamManager.java:78)
	at metricprocessor.stream.MetricProcessorStreamManager.findPartitionHost(MetricProcessorStreamManager.java:41)
	at metricprocessor.resource.MetricHostResolver.getHost(MetricHostResolver.java:75)
	at metricprocessor.resource.MetricHostResolver.getMetricHosts(MetricHostResolver.java:47)
	at metricprocessor.resource.MetricFetchTask.doTask(MetricFetchTask.java:84)
	at metricprocessor.resource.MetricFetchRunnable.run(MetricFetchRunnable.java:44)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

[ERROR] [xyz-StreamThread-10] [o.apache.kafka.streams.KafkaStreams] stream-client [xyz] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition xyz could be determined
	at org.apache.kafka.streams.processor.internals.StreamTask.committableOffsetsAndMetadata(StreamTask.java:436)
	at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:395)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition xyz could be determined

[WARN] [kafka-producer-network-thread| xyz-StreamThread-1-producer] [o.a.k.c.producer.internals.Sender] [Producer clientId=xyz xyz-StreamThread-1-producer] Received invalid metadata error in produce request on partition xyz due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now

Our Analysis:

We suspect that the frequent rebalances that occurred earlier may have contributed to the load on Kafka, ultimately leading to the timeout. However, we are uncertain about the root cause of the NotLeaderOrFollowerException that kicked off the sequence of events.

Request for Insights:

Has anyone experienced a similar issue or could provide insights into what might have caused the NotLeaderOrFollowerException in this context? Any guidance on how to prevent this in the future or recommendations on tuning to avoid similar timeouts during rebalances would be greatly appreciated.

Thank you in advance for your help!

Hard to say overall.

NotLeaderOrFollowerException means, that a client tries to read or write from a specific partition using a specific broker, but the used broker is not the leader for this partition. This can happen when the broker originally was the leader for a partition, but later there was a leader change broker side. For this case, the client will refresh its metadata to discover the new leader.

As you can see, the producer only logs this as a WARN message, and it will try to refresh it’s metadata internally. At this point, no error will be raised, but the client handles this internally.

The second error actually happens on the consumer and thus, is not related to the producer’s warn log. (It seem that the producer could refresh it’s metadata correctly, and just move on.) Looking into the stacktrace it seems a call to mainConsumer.position(...) does timeout – this is not expected and thus an IllegalStateException is thrown killing KafkaStreams. – It’s unclear why mainConsumer.position(...) would timeout for this case to begin with (as it should have the position already buffered…), but if it indeed does make a RPC to get the position (ie, committed offset) it is most likely a network issue (or broker overload such that the broker just did not respond in time).

HTH.