Raising error to the application since no reset policy is configured

Hi Team,

I am working on kafka stream application, where auto.offset.reset property is set to earliest.
I have internal topics with setting => rentention are 1 day, and cleanup policy on topic is delete.

I was using kafka-client version 3.1.1 with spring boot version 2.4.0 /Java version 11
Now i upgraded the version for kafka-client version to 3.7.0 by upgrading spring boot version 3.3.8 and java 21.

after that I was start getting intermediate issue like

raising error to the application since no reset policy is configured
2025-02-24 15:32:56,260 [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] e[33mWARN e[m o.a.k.s.p.i.StoreChangelogReader - stream-thread [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [app-internal.test-changelog-9], it is likely that the consumer’s position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later.
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=204659, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker1.kafka.x1.xyz:9094 (id: 1 rack: null)], epoch=0}} is out of range for partition app-internal.test-changelog-9
at org.apache.kafka.clients.consumer.internals.FetchCollector.handleInitializeErrors(FetchCollector.java:348) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.initialize(FetchCollector.java:230) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:110) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:666) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.pollRecordsFromRestoreConsumer(StoreChangelogReader.java:494) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:450) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1134) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) [kafka-streams-3.7.0.jar!/:?]
2025-02-24 15:32:56,262 [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] e[33mWARN e[m o.a.k.s.p.i.StreamThread - stream-thread [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] Detected the states of tasks [1_9] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [1_9] are corrupted and hence need to be re-initialized
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.pollRecordsFromRestoreConsumer(StoreChangelogReader.java:507) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:450) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1134) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) [kafka-streams-3.7.0.jar!/:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=204659, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker1.kafka.x1.xyz:9094 (id: 1 rack: null)], epoch=0}} is out of range for partition app-internal.test-changelog-9
at org.apache.kafka.clients.consumer.internals.FetchCollector.handleInitializeErrors(FetchCollector.java:348) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.initialize(FetchCollector.java:230) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:110) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:666) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.pollRecordsFromRestoreConsumer(StoreChangelogReader.java:494) ~[kafka-streams-3.7.0.jar!/:?]
… 5 more

What is your question?

As you can see from the log, it’s not an error but a WARN log. The issue comes from a changelog topic: partitions [app-internal.test-changelog-9];this means, that the locally stored offset for this partition (ie, from the local .checkpoint file) is out of range. Not totally clear why. On a clean shutdown of the application, the checkpointed offset should match the topic end offset.

Also note, that the error is translated into a TaskCorrupted exception, and KafkaStreams should recover from it automatically:

Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [1_9] are corrupted and hence need to be re-initialized

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.