Hi Kafka Community,
We are running a self-managed Kafka service hosted on AWS EKS Cluster, and recently encountered an issue that led to a Kafka Streams timeout, causing the streams to go into an error state. We’re hoping to get some insights into what might have caused this.
Context:
-
The timeout occurred during a partition rebalance, which took longer than the default timeout period. (1 min)
-
Prior to this, there were several rebalances (unrelated to the affected topic) occurring about half an hour earlier. These rebalances involved topics with no consumers, so there was no concern about timeouts on those.
-
During the incident, we observed a
NotLeaderOrFollowerException
which seems to have triggered the issue.
Observations:
-
The issue appears to have started with the
NotLeaderOrFollowerException
, which initiated a metadata update and subsequently triggered a rebalance. -
This rebalance exceeded the default timeout, leading to the timeout error and the Kafka Streams going into an error state.
-
We also noticed disconnect exceptions around the same time.
-
There was no broker restart, and we are using Zookeeper for metadata management (Kafka version: 2.8.2). There were no errors observed in Zookeeper.
LOG
[ERROR] [metric-resource-client-executor-37] [i.d.j.e.IllegalStateExceptionMapper] Error handling a request: baddcb33eda4c876 java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:350)
at org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1596)
at metricprocessor.stream.MetricProcessorStreamManager.getMetadata(MetricProcessorStreamManager.java:91)
at metricprocessor.stream.MetricProcessorStreamManager.findPartitionHost(MetricProcessorStreamManager.java:78)
at metricprocessor.stream.MetricProcessorStreamManager.findPartitionHost(MetricProcessorStreamManager.java:41)
at metricprocessor.resource.MetricHostResolver.getHost(MetricHostResolver.java:75)
at metricprocessor.resource.MetricHostResolver.getMetricHosts(MetricHostResolver.java:47)
at metricprocessor.resource.MetricFetchTask.doTask(MetricFetchTask.java:84)
at metricprocessor.resource.MetricFetchRunnable.run(MetricFetchRunnable.java:44)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[ERROR] [xyz-StreamThread-10] [o.apache.kafka.streams.KafkaStreams] stream-client [xyz] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition xyz could be determined
at org.apache.kafka.streams.processor.internals.StreamTask.committableOffsetsAndMetadata(StreamTask.java:436)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:395)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition xyz could be determined
[WARN] [kafka-producer-network-thread| xyz-StreamThread-1-producer] [o.a.k.c.producer.internals.Sender] [Producer clientId=xyz xyz-StreamThread-1-producer] Received invalid metadata error in produce request on partition xyz due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now
Our Analysis:
We suspect that the frequent rebalances that occurred earlier may have contributed to the load on Kafka, ultimately leading to the timeout. However, we are uncertain about the root cause of the NotLeaderOrFollowerException
that kicked off the sequence of events.
Request for Insights:
Has anyone experienced a similar issue or could provide insights into what might have caused the NotLeaderOrFollowerException
in this context? Any guidance on how to prevent this in the future or recommendations on tuning to avoid similar timeouts during rebalances would be greatly appreciated.
Thank you in advance for your help!