State store may have migrated to another instance after a broker failure

Hi,

First, thanks for the great work done on kstream api!

We have a kstream application (10 instance, 1 thread, running in kubernetes) that runs a topology reading data from a source topic and distributing data to multiple state store backed by a changelog topic.

That application also have java Scheduled Task that will output to file the content of the stores every hour.

DumpToFileTasks are passed the KafkaStreams instance and the store name to dump.

It has been working well with kafka 2.3 for the past year and half.

Our app recently moved from kafka 2.3 to kafka 2.8 (brokers and kstream api)

We have encountered the following after a kafka broker outage

	2021-07-07 14:05:15,344 INFO  Dumping data for store name active-data at time 2021-07-07T14:05:15.344Z (myorg.DumpToFileTask)
	2021-07-07 14:05:15,344 INFO  Getting local store from store waiter active-data (DumpToFileTask)
	2021-07-07 14:05:15,344 INFO  QueryableStoreWaiter getting local store from store waiter active-data (myorg.QueryableStoreWaiter)
	2021-07-07 14:05:15,344 ERROR Error during creation of dump to file task for active-data. [icm-snapshot-status-generator-0] (myorg.DumpToFileTask)
	org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, active-data, may have migrated to another instance.
	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:65)
	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
	at myorg.DumpToFileTask.run(DumpToFileTask.java:89)
	at myorg.healthchecks.MonitoredTask.run(MonitoredTask.java:288)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

our kstream app eventually reconnect to kafka and rebalance, but 1 of the 10 instances started having this issue when trying to access the store.

Stream Processing of the source topic to output changelog returns to normal when kafka comes back up, on all instances
Changelog keep getting updated.

But the stores became unqueriable by the scheduled DumpToFileTask, with the error above, for that one instance.

When it queries, the faulty app instance finds the store succesfully with:
StoreQueryParameters queryParameters = StoreQueryParameters.fromNameAndType(storeName, queryableStoreType);
streams.store(queryParameters);

But this line generate the InvalidStateStoreException:
readOnlyKeyValueStore.all()

We kept getting this issue, every time the dumpto file task was attempted on that instance. Then we had to restart the service, and the problem disappeared.

We noticed on the consumer group description for the input topic, for the instance that was having this issue, that the thread number had changed:
CONSUMER-ID went from
myapp-…-StreamThread-1-consumer
to
myapp-…-StreamThread-2-consumer

Thread id was bumped to 2, and the instance is configured to run with 1 thread.
Hinting to something related to internal threading, or the new add/remove thread feature?

I was able to reproduce a similar pattern by removing and adding back a StreamThread (new feature in 2.8) on the streams. (Manually triggered through an admin REST api the application has),
streams.removeStreamThread();
streams.addStreamThread();

Once this sequence is done, the topology keeps processing on the new thread, but the store becomes unqueriable, with the same exception above.

Hopefully this info will be useful for people maintaining the kstream api.

Our work around has been to restart the instance that has this issue.
If you have a suggestion to recover more gracefully from the unqueriable store, please let me me know

Kind regards
Francois

We replaced our KafkaStream error handler from

streams.setUncaughtExceptionHandler(exception → StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);

to

streams.setUncaughtExceptionHandler(ex → {
logger.error(“UncaughtExceptionHandler : Unhandled exception in {} - exiting”, Thread.currentThread().getName(), ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});

On multiple tests involving restarts of the kafka cluster while the stream application is running,

Out of 10 instances of our stream application, 1 or 2 instance will ends up in the UncaughtExceptionHandler.

It shows this errors:

2021-08-05 17:44:09,848 ERROR UncaughtExceptionHandler : Unhandled exception in cmd-processor-d8a1c6db-8152-4873-9285-9feb9550ca09-StreamThread-1 - exiting [cmd-processor-d8a1c6db-8152-4873-9285-9feb9550ca09-StreamThread-1]
java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition command-1 could be determined
at org.apache.kafka.streams.processor.internals.StreamTask.committableOffsetsAndMetadata(StreamTask.java:436)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:395)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1052)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1025)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1010)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

With the SHUTDOWN_CLIENT option, the instances that ends up in that “IllegalState” shutdown and eventually gets respawned by k8s.
It eventually reconnects back normally once the kafka brokers come back online and the state store is queryable without problem.

Seems you are hitting: [KAFKA-13096] QueryableStoreProvider is not updated when threads are added/removed/replaced rendering IQ impossible - ASF JIRA

It’s fixed for upcoming 2.8.1 and 3.0.0 releases.

That is great news, Thanks Matthias!