Hi Team,
I am working on kafka stream application, where auto.offset.reset property is set to earliest.
I have internal topics with setting => rentention are 1 day, and cleanup policy on topic is delete.
I was using kafka-client version 3.1.1 with spring boot version 2.4.0 /Java version 11
Now i upgraded the version for kafka-client version to 3.7.0 by upgrading spring boot version 3.3.8 and java 21.
after that I was start getting intermediate issue like
raising error to the application since no reset policy is configured
2025-02-24 15:32:56,260 [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] e[33mWARN e[m o.a.k.s.p.i.StoreChangelogReader - stream-thread [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [app-internal.test-changelog-9], it is likely that the consumer’s position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later.
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=204659, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker1.kafka.x1.xyz:9094 (id: 1 rack: null)], epoch=0}} is out of range for partition app-internal.test-changelog-9
at org.apache.kafka.clients.consumer.internals.FetchCollector.handleInitializeErrors(FetchCollector.java:348) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.initialize(FetchCollector.java:230) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:110) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:666) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.pollRecordsFromRestoreConsumer(StoreChangelogReader.java:494) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:450) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1134) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) [kafka-streams-3.7.0.jar!/:?]
2025-02-24 15:32:56,262 [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] e[33mWARN e[m o.a.k.s.p.i.StreamThread - stream-thread [app-3cff9a6f-8fbc-48ce-b080-898bcb5cf2e4-StreamThread-1] Detected the states of tasks [1_9] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [1_9] are corrupted and hence need to be re-initialized
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.pollRecordsFromRestoreConsumer(StoreChangelogReader.java:507) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:450) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1134) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) ~[kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) [kafka-streams-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) [kafka-streams-3.7.0.jar!/:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=204659, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker1.kafka.x1.xyz:9094 (id: 1 rack: null)], epoch=0}} is out of range for partition app-internal.test-changelog-9
at org.apache.kafka.clients.consumer.internals.FetchCollector.handleInitializeErrors(FetchCollector.java:348) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.initialize(FetchCollector.java:230) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:110) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:666) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.7.0.jar!/:?]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.pollRecordsFromRestoreConsumer(StoreChangelogReader.java:494) ~[kafka-streams-3.7.0.jar!/:?]
… 5 more