Streams app with EOS gets stuck restoring after upgrade to 2.8

Very similar to recent post on this forum, but will share anyway.

Running with KStream 2.8, kafka broker 2.8,
3 brokers.

commands topic is 10 partitions
command-expiry-store-changelog topic is 10 partitions
events topic is 10 partitions

with this topology

 Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [commands])
      --> KSTREAM-TRANSFORM-0000000001
    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
      --> KSTREAM-TRANSFORM-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-TRANSFORM-0000000002 (stores: [command-expiry-store])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-TRANSFORM-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: events)
      <-- KSTREAM-TRANSFORM-0000000002

Our stream app runs with processing.guarantee exactly_once (rather new to us),
strictly to ensure atomicity of transactions for a “Move” of data between the store and the sink events topic in punctuator operations.

With at-least-once we occasionnaly (on network outages) get item removed from the store but not pushed to the event topic.

After a Kafka outage where all 3 brokers were killed at the same time,

Brokers restarted and initialized succesfully. :wave:

When restarting the topology above, one of the tasks would never initialize fully, the restore phase would keep outputing this messages every few minutes:


2021-08-16 14:20:33,421 INFO  stream-thread [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] Restoration in progress for 1 partitions. {commands-processor-expiry-store-changelog-8: position=11775908, end=11775911, totalRestored=2002076} [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] (org.apache.kafka.streams.processor.internals.StoreChangelogReader)

Task for partition 8 would never initialize, no more data would be read from the source commands topic for that partition.

Workaround we found to get it going:

Restarted the stream app with stream processing.guarantee back to at_least_once, than it proceed with reading the changelog and restoring partition 8 fully.

But we noticed afterward that partition 8 from command-expiry-store-changelog would not be cleaned/compacted by the log cleaner/compacter anymore.

So we resorted to delete/recreate our command-expiry-store-changelog topic and events topic and regenerate it from the commands, reading from beginning. :sob:

Will [KAFKA-12951] Infinite loop while restoring a GlobalKTable - ASF JIRA be fixing this issue ?

Do you have any other suggestion on what could have been applied to fix the issue without resorting to deleting/creating the topic involved in the transactions of this kstream app?

Thanks a lot!
Francois

The ticket you linked to only applies to global stores, so it should not fix the issue you observe as you are using a “regular” state store. – Might be an unknown bug…

Maybe best to file a ticket https://issues.apache.org/jira/browse/KAFKA. Not even sure if it is a Kafka Streams bug, as you mention that compaction does fail after broker restart… Can you reproduce the issue? Can you reproduce it using a different broker or KS version?

1 Like

Continuing the discussion from Streams app with EOS gets stuck restoring after upgrade to 2.8:

Thanks I will try to do that…
We were able to reproduce it this morning again:
We force-deleted all 3 pod running kafka.
After that, one of the partition can’t be restored. (like reported in previous post)
For that partition, we noticed these logs on the broker

[2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, command-expiry-store-changelog-9) while trying to send transaction markers for commands-processor-0_9, these partitions are likely deleted already and hence can be skipped (kafka.coordinator.transaction.TransactionMarkerChannelManager)

Then we stop the kstream app, and restarted kafka cleanly (with proper graceperiod)

Restarting the Kstream app another time,
we noticed this message on the app log:

2021-08-27 18:34:42,413 INFO [Consumer clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer, groupId=commands-processor] The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log [commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Is there any way to clean up that transaction?

KStream config :
StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 2000
StreamsConfig.REPLICATION_FACTOR_CONFIG, 2
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 24MB
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), “earliest”
StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE
producer.delivery.timeout.ms=120000
consumer.session.timeout.ms=30000
consumer.heartbeat.interval.ms=10000
consumer.max.poll.interval.ms=300000
num.stream.threads=1

Not sure. It might be best to file a Kafka ticket…