State store may have migrated to another instance after a broker failure

Hi,

First, thanks for the great work done on kstream api!

We have a kstream application (10 instance, 1 thread, running in kubernetes) that runs a topology reading data from a source topic and distributing data to multiple state store backed by a changelog topic.

That application also have java Scheduled Task that will output to file the content of the stores every hour.

DumpToFileTasks are passed the KafkaStreams instance and the store name to dump.

It has been working well with kafka 2.3 for the past year and half.

Our app recently moved from kafka 2.3 to kafka 2.8 (brokers and kstream api)

We have encountered the following after a kafka broker outage

	2021-07-07 14:05:15,344 INFO  Dumping data for store name active-data at time 2021-07-07T14:05:15.344Z (myorg.DumpToFileTask)
	2021-07-07 14:05:15,344 INFO  Getting local store from store waiter active-data (DumpToFileTask)
	2021-07-07 14:05:15,344 INFO  QueryableStoreWaiter getting local store from store waiter active-data (myorg.QueryableStoreWaiter)
	2021-07-07 14:05:15,344 ERROR Error during creation of dump to file task for active-data. [icm-snapshot-status-generator-0] (myorg.DumpToFileTask)
	org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, active-data, may have migrated to another instance.
	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:65)
	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
	at myorg.DumpToFileTask.run(DumpToFileTask.java:89)
	at myorg.healthchecks.MonitoredTask.run(MonitoredTask.java:288)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

our kstream app eventually reconnect to kafka and rebalance, but 1 of the 10 instances started having this issue when trying to access the store.

Stream Processing of the source topic to output changelog returns to normal when kafka comes back up, on all instances
Changelog keep getting updated.

But the stores became unqueriable by the scheduled DumpToFileTask, with the error above, for that one instance.

When it queries, the faulty app instance finds the store succesfully with:
StoreQueryParameters queryParameters = StoreQueryParameters.fromNameAndType(storeName, queryableStoreType);
streams.store(queryParameters);

But this line generate the InvalidStateStoreException:
readOnlyKeyValueStore.all()

We kept getting this issue, every time the dumpto file task was attempted on that instance. Then we had to restart the service, and the problem disappeared.

We noticed on the consumer group description for the input topic, for the instance that was having this issue, that the thread number had changed:
CONSUMER-ID went from
myapp-…-StreamThread-1-consumer
to
myapp-…-StreamThread-2-consumer

Thread id was bumped to 2, and the instance is configured to run with 1 thread.
Hinting to something related to internal threading, or the new add/remove thread feature?

I was able to reproduce a similar pattern by removing and adding back a StreamThread (new feature in 2.8) on the streams. (Manually triggered through an admin REST api the application has),
streams.removeStreamThread();
streams.addStreamThread();

Once this sequence is done, the topology keeps processing on the new thread, but the store becomes unqueriable, with the same exception above.

Hopefully this info will be useful for people maintaining the kstream api.

Our work around has been to restart the instance that has this issue.
If you have a suggestion to recover more gracefully from the unqueriable store, please let me me know

Kind regards
Francois