InvalidProducerEpochException when running EOS and utillizing state stores

I’m struggling with the following error message happening rather frequently in my Kafka Streams application:

stream-thread [<app_id>-40723d6e-c6e2-47d7-9426-e22da29bcd17-StreamThread-1] stream-task [1_1] Error encountered sending record to topic <app_id>-<store_name>-changelog for task 1_1 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out

The Streams configuration is

Properties().apply {
        put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.bootstrapServers)
        put(StreamsConfig.APPLICATION_ID_CONFIG, appConfig.applicationId)
        put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()::class.java.name)
        put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java)
        put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler::class.java)
        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
        put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)
        put(StreamsConfig.STATE_DIR_CONFIG, appConfig.stateDir)
        put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest")
        put(StreamsConfig.consumerPrefix("internal.leave.group.on.close"), true)
        put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, appConfig.schemaRegistry)
    }

The Streams topology consists of some stateless filter steps and some custom, stateful Processors. The Processors are not doing any complex calculations, but they are frequently accessing the state stores. The implementation of the processor giving the InvalidProducerEpochException is

class PunctuateProcessor(
    private val config: PunctuateProcessorConfig,
    private val groupStoreName: String,
    private val punctuateStoreName: String,
    private val storeSerde: Serde<AvroObject>,
) : ContextualProcessor<String, AvroObject, String, List<AvroObject>>() {
    private lateinit var groupStore: WindowStore<String, MutableList<KafkaRecordWithHeaders>>
    private lateinit var punctuateStore: KeyValueStore<String, List<KafkaRecordWithHeaders>>
    private var hasForwardedThisInterval = false

    override fun init(context: ProcessorContext<String, List<AvroObject>>) {
        super.init(context)
        groupStore = context.getStateStore(groupStoreName)
        punctuateStore = context.getStateStore(punctuateStoreName)
        context().schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME) {
            schedule()
        }
    }

    override fun process(record: Record<String, AvroObject>) {
        groupMeasurementBasedOnLocationAndInterval(record)
    }

    private fun groupMeasurementBasedOnLocationAndInterval(record: Record<String, AvroObject>) {
        val recordValue = record.value()
        val timeIntervalStart = floorSeconds(recordValue.dateTime)
        val storedInterval = groupStore.fetch(recordValue.mrid, timeIntervalStart)
        val recordsInInterval = storedInterval ?: mutableListOf()

        recordsInInterval.add(record.storify(storeSerde))
        groupStore.put(recordValue.mrid, recordsInInterval, timeIntervalStart)
    }

    private fun schedule() {
        if (!isTimeToForward(config.forwardSecond)) {
            hasForwardedThisInterval = false
            return
        }

        if (hasForwardedThisInterval) {
            return
        }

        updatePunctuateStoreWithNewestInterval()
        forwardMeasurements()
        hasForwardedThisInterval = true
    }

    private fun isTimeToForward(publishTime: Duration): Boolean {
        return context().currentSystemTimeMs() % 10_000 >= publishTime.toMillis()
    }

    private fun updatePunctuateStoreWithNewestInterval() {
        val intervalEndTimestamp = floorSeconds(context().currentSystemTimeMs())
        val intervalStartTimestamp = intervalEndTimestamp - 10_000

        groupStore.fetchAll(intervalStartTimestamp, intervalEndTimestamp - 1).use { iterator ->
            val newestMeasurementValuesByLocation =
                iterator.asSequence()
                    .map { KeyValue(it.key.key(), it.value.toList()) }
                    .toList()
            punctuateStore.putAll(newestMeasurementValuesByLocation)
        }
    }

    private fun forwardMeasurements() {
        punctuateStore.all().use { iterator ->
            iterator.forEachRemaining { newestReceivedInterval ->
                val storedIntervalRecords = newestReceivedInterval.value.map { it.unStorify(storeSerde) }
                val measurementValues = storedIntervalRecords.map { it.value() }
                val recordTimestamp = measurementValues.maxBy { it.dateTime }.dateTime

                val recordToForward =
                    Record(
                        storedIntervalRecords.last().value().mrid,
                        measurementValues,
                        recordTimestamp,
                        storedIntervalRecords.last().headers(),
                    )
                context().forward(recordToForward)
            }
        }
    }
}

I would expect Kafka Streams to work fine “out-of-the-box” and not throw exceptions when all the default configurations are used? Why am I experiencing these exceptions?

The exceptions result in the application stopping to produce data to the output topic for 1 - 2 minutes due to a restart of the producer. This is unfortunately not acceptable in my business case.

1 Like

I would expect Kafka Streams to work fine “out-of-the-box” and not throw exceptions when all the default configurations are used?

That’s not necessarily a valid assumption. As a matter of fact, Kafka Streams ships certain configs that are not optimized for production deployment to begin with, but are optimized for local development. Cf https://kafka.apache.org/36/documentation/streams/developer-guide/config-streams.html#id6 and note the config categorization into “high”, “medium”, “low”.

Last but not least, it always depends on your program…

Looking into the code, I see that you are using a potentially heavy-weight punctuator. It does a groupStore.fetchAll as well as a punctuateStore.all() – not sure how much data you might accumulate, but both operations sounds expensive. – As long as the punctuation is running, the open TX cannot be committed, and if you hit transaction.timeout.ms the TX might get aborted broker side what could explain the exception you see.

Might be good to peek into JMX metric Apache Kafka, ie, punctuate-latency-avg and punctuate-latency-max.

If a TX is aborted via timeout is also logged broker side.

HTH.

1 Like

Thank you for the tips! I will look into it :blush:

And just out of curiosity: Are there some general best practices when implementing logic utilizing StateStore in a Processor?

Maybe check out this talk: The Nuts and Bolts of Kafka Streams---An Architectural Deep Dive | Current 2023

It might help.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.