Very similar to recent post on this forum, but will share anyway.
Running with KStream 2.8, kafka broker 2.8,
3 brokers.
commands topic is 10 partitions
command-expiry-store-changelog topic is 10 partitions
events topic is 10 partitions
with this topology
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [commands])
--> KSTREAM-TRANSFORM-0000000001
Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
--> KSTREAM-TRANSFORM-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-TRANSFORM-0000000002 (stores: [command-expiry-store])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORM-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: events)
<-- KSTREAM-TRANSFORM-0000000002
Our stream app runs with processing.guarantee exactly_once (rather new to us),
strictly to ensure atomicity of transactions for a “Move” of data between the store and the sink events topic in punctuator operations.
With at-least-once we occasionnaly (on network outages) get item removed from the store but not pushed to the event topic.
After a Kafka outage where all 3 brokers were killed at the same time,
Brokers restarted and initialized succesfully.
When restarting the topology above, one of the tasks would never initialize fully, the restore phase would keep outputing this messages every few minutes:
2021-08-16 14:20:33,421 INFO stream-thread [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] Restoration in progress for 1 partitions. {commands-processor-expiry-store-changelog-8: position=11775908, end=11775911, totalRestored=2002076} [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
Task for partition 8 would never initialize, no more data would be read from the source commands topic for that partition.
Workaround we found to get it going:
Restarted the stream app with stream processing.guarantee back to at_least_once, than it proceed with reading the changelog and restoring partition 8 fully.
But we noticed afterward that partition 8 from command-expiry-store-changelog would not be cleaned/compacted by the log cleaner/compacter anymore.
So we resorted to delete/recreate our command-expiry-store-changelog topic and events topic and regenerate it from the commands, reading from beginning.
Will [KAFKA-12951] Infinite loop while restoring a GlobalKTable - ASF JIRA be fixing this issue ?
Do you have any other suggestion on what could have been applied to fix the issue without resorting to deleting/creating the topic involved in the transactions of this kstream app?
Thanks a lot!
Francois