I’m struggling with the following error message happening rather frequently in my Kafka Streams application:
stream-thread [<app_id>-40723d6e-c6e2-47d7-9426-e22da29bcd17-StreamThread-1] stream-task [1_1] Error encountered sending record to topic <app_id>-<store_name>-changelog for task 1_1 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out
The Streams configuration is
Properties().apply {
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.bootstrapServers)
put(StreamsConfig.APPLICATION_ID_CONFIG, appConfig.applicationId)
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()::class.java.name)
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java)
put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler::class.java)
put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)
put(StreamsConfig.STATE_DIR_CONFIG, appConfig.stateDir)
put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest")
put(StreamsConfig.consumerPrefix("internal.leave.group.on.close"), true)
put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, appConfig.schemaRegistry)
}
The Streams topology consists of some stateless filter steps and some custom, stateful Processors. The Processors are not doing any complex calculations, but they are frequently accessing the state stores. The implementation of the processor giving the InvalidProducerEpochException
is
class PunctuateProcessor(
private val config: PunctuateProcessorConfig,
private val groupStoreName: String,
private val punctuateStoreName: String,
private val storeSerde: Serde<AvroObject>,
) : ContextualProcessor<String, AvroObject, String, List<AvroObject>>() {
private lateinit var groupStore: WindowStore<String, MutableList<KafkaRecordWithHeaders>>
private lateinit var punctuateStore: KeyValueStore<String, List<KafkaRecordWithHeaders>>
private var hasForwardedThisInterval = false
override fun init(context: ProcessorContext<String, List<AvroObject>>) {
super.init(context)
groupStore = context.getStateStore(groupStoreName)
punctuateStore = context.getStateStore(punctuateStoreName)
context().schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME) {
schedule()
}
}
override fun process(record: Record<String, AvroObject>) {
groupMeasurementBasedOnLocationAndInterval(record)
}
private fun groupMeasurementBasedOnLocationAndInterval(record: Record<String, AvroObject>) {
val recordValue = record.value()
val timeIntervalStart = floorSeconds(recordValue.dateTime)
val storedInterval = groupStore.fetch(recordValue.mrid, timeIntervalStart)
val recordsInInterval = storedInterval ?: mutableListOf()
recordsInInterval.add(record.storify(storeSerde))
groupStore.put(recordValue.mrid, recordsInInterval, timeIntervalStart)
}
private fun schedule() {
if (!isTimeToForward(config.forwardSecond)) {
hasForwardedThisInterval = false
return
}
if (hasForwardedThisInterval) {
return
}
updatePunctuateStoreWithNewestInterval()
forwardMeasurements()
hasForwardedThisInterval = true
}
private fun isTimeToForward(publishTime: Duration): Boolean {
return context().currentSystemTimeMs() % 10_000 >= publishTime.toMillis()
}
private fun updatePunctuateStoreWithNewestInterval() {
val intervalEndTimestamp = floorSeconds(context().currentSystemTimeMs())
val intervalStartTimestamp = intervalEndTimestamp - 10_000
groupStore.fetchAll(intervalStartTimestamp, intervalEndTimestamp - 1).use { iterator ->
val newestMeasurementValuesByLocation =
iterator.asSequence()
.map { KeyValue(it.key.key(), it.value.toList()) }
.toList()
punctuateStore.putAll(newestMeasurementValuesByLocation)
}
}
private fun forwardMeasurements() {
punctuateStore.all().use { iterator ->
iterator.forEachRemaining { newestReceivedInterval ->
val storedIntervalRecords = newestReceivedInterval.value.map { it.unStorify(storeSerde) }
val measurementValues = storedIntervalRecords.map { it.value() }
val recordTimestamp = measurementValues.maxBy { it.dateTime }.dateTime
val recordToForward =
Record(
storedIntervalRecords.last().value().mrid,
measurementValues,
recordTimestamp,
storedIntervalRecords.last().headers(),
)
context().forward(recordToForward)
}
}
}
}
I would expect Kafka Streams to work fine “out-of-the-box” and not throw exceptions when all the default configurations are used? Why am I experiencing these exceptions?
The exceptions result in the application stopping to produce data to the output topic for 1 - 2 minutes due to a restart of the producer. This is unfortunately not acceptable in my business case.