InvalidProducerEpochException causing duplicate despite of EOS

Please, help me understand what is wrong with the code or the config as the behaviour seems to be pretty bizarre for me and I am not sure if I understand it correctly.

We are having Kafka Streams application. The main idea is to forward original data if the given message reaches the end of the flow with success. That’s why we are waiting for the message to be present on both topics, the first one and the last one of the flow. Solution is based on Processor API and State Stores.
And the problem is, we are observing InvalidProducerEpochException. It is a rare case, e.g. during performance testing 50 messages per second, 50k in total. One in five runs this error occurs. But the implication is disturbing. After the stream restart, one message (MESSAGE_1) is getting duplicated, the rest is sent successful after the stream restart. Logs:

2024-06-10 11:53:49,661 INFO *.processor.MatchingProcessor [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message with key MESSAGE_1 was sent to output.
2024-06-10 11:53:49,661 INFO *.processor.InputSentProcessor [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_2: Adding sent data
2024-06-10 11:54:18,574 INFO *.services.id$Companion [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_2: Consumed id package from topic *
2024-06-10 11:54:18,594 INFO *.services.id$Companion [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Send package consumed info to topic *
2024-06-10 11:54:18,594 INFO *.services.EventFactory [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_2: Getting data for message
2024-06-10 11:54:18,594 INFO *.services.EventFactory [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_2: Corresponding data: *
2024-06-10 11:54:18,595 INFO *.processor.MatchingProcessor [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message with key MESSAGE_2 was sent to output.
2024-06-10 11:54:18,595 INFO *.processor.InputSentProcessor [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_3: Adding sent data
2024-06-10 11:54:18,599 INFO *.services.id$Companion [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_3: Consumed id package from topic *
2024-06-10 11:54:18,630 INFO *.services.id$Companion [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Send package consumed info to topic *
2024-06-10 11:54:18,630 INFO *.services.EventFactory [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_3: Getting data for message
2024-06-10 11:54:18,630 INFO *.services.EventFactory [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_3: Corresponding data: *
2024-06-10 11:54:18,630 INFO *.processor.MatchingProcessor [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message with key MESSAGE_3 was sent to output.
2024-06-10 11:54:18,630 INFO *.processor.InputSentProcessor [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_4: Adding sent data
2024-06-10 11:54:18,634 INFO *.services.id$Companion [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_4: Consumed id package from topic *
2024-06-10 11:54:18,642 INFO *.services.id$Companion [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Send package consumed info to topic *
2024-06-10 11:54:18,643 INFO *.services.EventFactory [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_4: Getting data for message
2024-06-10 11:54:18,643 INFO *.services.EventFactory [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message MESSAGE_4: Corresponding data: *
2024-06-10 11:54:18,643 INFO *.processor.MatchingProcessor [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Message with key MESSAGE_4 was sent to output.
2024-06-10 11:54:18,653 ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl [kafka-producer-network-thread | service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1-producer] stream-thread [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] stream-task [0_2] Error encountered sending record to topic service-application-SENT-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
2024-06-10 11:54:18,656 INFO org.apache.kafka.clients.producer.internals.TransactionManager [kafka-producer-network-thread | service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1-producer] [Producer clientId=service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1-producer, transactionalId=service-application-d29124b7-ee33-456f-82fc-05d58665120e-1] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
2024-06-10 11:54:18,659 INFO org.apache.kafka.clients.producer.internals.TransactionManager [kafka-producer-network-thread | service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1-producer] [Producer clientId=service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1-producer, transactionalId=service-application-d29124b7-ee33-456f-82fc-05d58665120e-1] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
2024-06-10 11:54:18,659 INFO org.apache.kafka.clients.producer.internals.TransactionManager [kafka-producer-network-thread | service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1-producer] [Producer clientId=service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1-producer, transactionalId=service-application-d29124b7-ee33-456f-82fc-05d58665120e-1] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
2024-06-10 11:54:18,659 WARN org.apache.kafka.streams.processor.internals.StreamThread [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] stream-thread [service-application-d29124b7-ee33-456f-82fc-05d58665120e-StreamThread-1] Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group.
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic service-application-SENT-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:303)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)
at io.opentelemetry.javaagent.instrumentation.kafkaclients.ProducerCallback.onCompletion(ProducerCallback.java:36)
at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1538)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:312)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:273)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:237)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:830)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:349)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:252)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.

This changelog topic is connected to the store, so this issue is similar to InvalidProducerEpochException when running EOS and utillizing state stores, but for us is a pretty rare case and is causing this duplicate despite EOS. Also, we are not performing any significant operations on these stores, just get, put and delete. Here is the code:

Config:

Properties().apply {
            put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.applicationId)
            put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.bootstrapServers)
            put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().javaClass.name)
            put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java.name)
            put(
                KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
                kafkaProperties.specificAvroReader.toBoolean()
            )
            put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.schemaRegistry)
            put(
                StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                LogAndContinueExceptionHandler()::class.java
            )
            put(
                StreamsConfig.STATE_DIR_CONFIG,
                kafkaProperties.stateDirectory
            )
            put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
        }

Saving to the state store:

open class InputSentProcessor(
    private val kafkaSentDlqStringProducer: KafkaProducer<String, String>
) : ProcessorSupplier<String, String, String, Unit> {

    override fun get(): Processor<String, String, String, Unit> {

        return object : Processor<String, String, String, Unit> {

            lateinit var context: ProcessorContext<String, Unit>
            lateinit var sentStore: KeyValueStore<String, String>

            override fun init(context: ProcessorContext<String, Unit>) {
                this.context = context
                this.sentStore = context.getStateStore(StoreTypes.SENT.name)
            }

            override fun process(record: Record<String, String>) {
                try {
                    processSentRecord(record)
                } catch (e: Exception) {
                    logger.error("Message ${record.key()}: Exception while processing the message: ${e.message}")
                    sendMessageSentToDLQ(record)
                }
            }

            private fun sendMessageSentToDLQ(record: Record<String, String>) {
                kafkaSentDlqStringProducer.sentToDLQ(
                    record.value(),
                    record.key(),
                    "topicSentDLQ",
                    logger
                )
            }

            private fun processSentRecord(record: Record<String, String>) {
                sentStore.put(record.key(), record.value())
                logger.info("Message ${record.key()}: Adding sent data")
                context.forward(record.withValue(null))
            }
        }
    }

    companion object {
        private val logger = LoggerFactory.getLogger(InputSentProcessor::class.java)
    }
}

Matching messages and cleaning the stores:

open class MatchingProcessor(private val dlqStringProducer: KafkaProducer<String, String>) : ProcessorSupplier<String, Any, String, Event> {

    override fun get(): Processor<String, Any, String, Event> {
        return object : Processor<String, Any, String, Event> {

            lateinit var context: ProcessorContext<String, Event>
            lateinit var idStore: KeyValueStore<String, Event>
            lateinit var sentStore: KeyValueStore<String, String>
            lateinit var failedStore: KeyValueStore<String, String>

            override fun init(context: ProcessorContext<String, Event>) {
                this.context = context
                this.idStore = context.getStateStore(StoreTypes.ID.name)
                this.sentStore = context.getStateStore(StoreTypes.SENT.name)
                this.failedStore = context.getStateStore(StoreTypes.FAILED.name)
            }

            override fun process(record: Record<String, Any>) {
                try {
                    record.key().let { key ->
                        when {
                            idStore[key] != null && sentStore[key] != null -> {
                                // composeEvent method is containing separate consumer (outside of the stream scope) to get the new, unique id from the dedicated topic
                                context.forward(record.withValue(composeEvent(key, sentStore[key], idStore[key])))

                                idStore.delete(key)
                                sentStore.delete(key)

                                logger.info("Message with key $key was sent to output")
                            }

                            idStore[key] != null && failedStore[key] != null -> {
                                idStore.delete(key)
                                failedStore.delete(key)
                                logger.info("Message with key $key was failed to send")
                            }

                            else -> logger.info("Message with key $key - waiting for matching records")
                        }
                    }
                } catch (exception: Exception) {
                    sentStore[record.key()]?.let { message ->
                        dlqStringProducer.sentToDLQ(message, record.key(), "topicSentDLQ", logger)
                    }

                    failedStore[record.key()]?.let { message ->
                        dlqStringProducer.sentToDLQ(message, record.key(), "topicFailedDLQ", logger)
                    }
                }
            }
        }
    }

    companion object {
        private val logger = LoggerFactory.getLogger(MatchingProcessor::class.java)
    }
}

So, I think the delete the one which is not getting send to the changelog topic, thus the record is still there. But I am not sure why it was not deleted from the second store and why only the first message is getting duplicated (maybe the reason is this is the one, which struggle to delete? Forwarding to the sink is before the cleaning process, so it was sent but it failed to delete). Also, could the delete operation be this long? Or maybe, the fault is inside this additional consumer under the hood (I know that this kind of approach does not guarantee exactly once, source).

I will be grateful for some hints to fully understand the issue. We are planning to simply switch the order or operations to move deleting before sending, but probably our long-term goal will be to switch to Stream and getting rid of these state stores as our business requirements demand no duplicates.

Thank you in advance for all answers.

Not 100% sure… Maybe you are hitting: [KAFKA-16017] Checkpointed offset is incorrect when task is revived and restoring - ASF JIRA ?