Very similar to recent post on this forum, but will share anyway.
Running with KStream 2.8, kafka broker 2.8,
3 brokers.
commands topic is 10 partitions
command-expiry-store-changelog topic is 10 partitions
events topic is 10 partitions
with this topology
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [commands])
--> KSTREAM-TRANSFORM-0000000001
Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
--> KSTREAM-TRANSFORM-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-TRANSFORM-0000000002 (stores: [command-expiry-store])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORM-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: events)
<-- KSTREAM-TRANSFORM-0000000002
Our stream app runs with processing.guarantee exactly_once (rather new to us),
strictly to ensure atomicity of transactions for a “Move” of data between the store and the sink events topic in punctuator operations.
With at-least-once we occasionnaly (on network outages) get item removed from the store but not pushed to the event topic.
After a Kafka outage where all 3 brokers were killed at the same time,
Brokers restarted and initialized succesfully. ![]()
When restarting the topology above, one of the tasks would never initialize fully, the restore phase would keep outputing this messages every few minutes:
2021-08-16 14:20:33,421 INFO stream-thread [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] Restoration in progress for 1 partitions. {commands-processor-expiry-store-changelog-8: position=11775908, end=11775911, totalRestored=2002076} [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
Task for partition 8 would never initialize, no more data would be read from the source commands topic for that partition.
Workaround we found to get it going:
Restarted the stream app with stream processing.guarantee back to at_least_once, than it proceed with reading the changelog and restoring partition 8 fully.
But we noticed afterward that partition 8 from command-expiry-store-changelog would not be cleaned/compacted by the log cleaner/compacter anymore.
So we resorted to delete/recreate our command-expiry-store-changelog topic and events topic and regenerate it from the commands, reading from beginning. ![]()
Will [KAFKA-12951] Infinite loop while restoring a GlobalKTable - ASF JIRA be fixing this issue ?
Do you have any other suggestion on what could have been applied to fix the issue without resorting to deleting/creating the topic involved in the transactions of this kstream app?
Thanks a lot!
Francois