Why am I getting InvalidProducerEpochException when no producer is used?

I have a kafka streams application that creates a client-side KTable and aggregates events from a topic until the end is reached.
The application is streaming that KTable and using a custom processor to commit the aggregated event to a relational database. However, the offsets are not moving forward, despite attempting to configure the stream for ONLY_ONCE processing.
I have tried committing the offset in the process method of custom processor. This results in the following exception:

22:36:44.227 [kafka-producer-network-thread | sonata-ods-streams-c2c7a9c8-5dfc-467c-a113-4a45051b4c9c-StreamThread-1-0_0-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - stream-thread [sonata-ods-streams-c2c7a9c8-5dfc-467c-a113-4a45051b4c9c-StreamThread-1] task [0_0] Error encountered sending record to topic sonata-ods-streams-bravura.transactionSegment-repartition for task 0_0 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out

My configuration for the POC is as follows:

            final Properties settings = new Properties();
            settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "ods-streams");
            settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "server01:9092");
            settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
            settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
            settings.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://server01:8081");
            settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
            settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 50 * 1024 * 1024L); // 10MB
            settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000); // 30s
            settings.put(StreamsConfig.POLL_MS_CONFIG, 400);

Can someone please help:

  • why am I getting an epoch error? Is a producer always explicitly created? I assume this may be used to create a kafka transaction for once only processing…
  • Why when I only have a single tread connecting to the KTable (although on a topic with multiple partitions) am I getting an epoch error?
  • How to I correct this error? Do I need to obtain the producer and invoke a method on it, or change configuration?
  • Should I be explicitly committing offsets in my custom processor? ( processorContext.commit() ). I am hoping that a failure to commit will cause the aggregated event to be re-processed (to handle expected SQL exceptions), but a commit will move onto the next aggregated event.

Thanks in advance.

As you are using an aggregation, the KTable will be backed up by a changelog topic and a producer is created to write into this changelog topic.

I assume the root cause of the exception could be the large commit interval you have configured (30 sec)? For newer version of Kafka Streams, default transaction.timeout.ms is 10 sec, and thus, a transaction could time out before a commit happens.

Why did you change the TX timeout? How frequently do you commit using procescorContext.commit(): Btw: calling processorContext.commit() is only a request to commit as soon as possible. However, after commit() returned the commit did not happen yet and there is no guarantee when the commit will actually be done.

Btw: EOS only really works if you write the result back into a Kafka topic. However, as you are using a custom Processor to write the data into a database, EOS won’t work, because it does not cover side-effects (like writing to an external system).

Using EOS could still make sense for you case, as it will ensure that your aggregation is correct. However, you will need to make the writes into the database idempotent as they might get repeated and they are not guarded via EOS.

Thank you for the quick reply.
My data is sourced from a CDC tool, and we are trying to maintain transaction integrity (rather than rely on it eventually becoming consistent). We can guarantee that the data we wish to aggregate has a common topic and topic key, as well as a marker to indicate the last event in the transaction.
The aggregated topic was to combine all events for a transaction so that it appeared at the same time. Once the end marker is received and the “transaction” is processed, there is no need to ever touch this event/event key again. So the intent was to avoid writes of the aggregation back to kafka (and locally cache them all).
An earlier POC used the KafkaConsumer directly, and this worked well…up to a point. Offsets were tracked outside of kafka, but things became unstable, and we were looking for an out-of-the-box solution to manage offsets, rather than handle them directly.
The timeout was changed to reduce writes back to kafka (since I only care about offsets being maintained, not data).
I call commit on every source “transaction”, after the data has been committed to the destination.
BTW, is calling commit correct (to indicate to kafka to advance offsets), or does this also happen behind the scenes? If so, how would I mark an event for re-processing (without redirecting the failed event to a new topic)?

BTW, is calling commit correct (to indicate to kafka to advance offsets), or does this also happen behind the scenes?

It not incorrect, but not strictly necessary. Based on commit.interval.ms, committing happens regularly. Note that commit.interval.ms implies also that offsets are committed. In fact, for at-least-once processing, it’s the offset commit interval; for EOS it’s both the offset and TX commit interval (because committing offsets is part of the TX that gets committed).

If so, how would I mark an event for re-processing (without redirecting the failed event to a new topic)?

In Kafka you cannot really mark a single event for re-processing (Kafka is not a message queue…). If you want to reprocess data, using the consumer you can use seek() – For Kafka Streams, you would need to stop the application, commit the desired start offsets for the used application.id (= group.id) and start the application afterwards so it will pick up the committed offsets as starting point. – Of course, you will re-process everything from the starting offsets.

Many thanks.
In terms of reprocessing, can I reposition a KStream on a KTable, or would I need to dig into the internal offsets? This sounds like scenario that is better suited to use of a KafkaConsumer than KStream/KTable…

can I reposition a KStream on a KTable

Not sure what you mean by this?

his sounds like scenario that is better suited to use of a KafkaConsumer than KStream/KTable…

It depends, but yes, a plain KafkaConsumer is more flexible if you want to do seek() operations.

Sorry about that, yes it is a quite cryptic statement:

can I reposition a KStream on a KTable

My understanding is that a KTable will be persisted back to kafka in a log compacted topic. An offset for this KTable will be tracked for my application.id. Therefore I could close the KStream that I am using to process the KTable, and reposition the offset to re-read values (so long as no other records for this key are received in the interim, which I can guarantee).

I could also just have reprocessed the original record, but then I have limited time in which to retry before I hit the kafka producer timeout…

As a side-note, when does kafka determine the record is read successfully in a read-at-least-once scenario? Is it leveraged by the next attempt to read from the stream?

My understanding is that a KTable will be persisted back to kafka in a log compacted topic.

Yes, for each update to the RocksDB state, a write to the changelog topic happens.

An offset for this KTable will be tracked for my application.id.

Not really, because changelogs are only read on recovery. During regular processing, only writes are happening. And even during restore, offsets are not tracked via consumer group offset commit.

If you are talking about builder.table(), it’s two things: there is the input topic for which offsets will be tracked via consumer group offset commits, plus the changelog topic in which we write into.

Therefore I could close the KStream that I am using to process the KTable, and reposition the offset to re-read values (so long as no other records for this key are received in the interim, which I can guarantee)

By “KStream” you mean KafkaStreams? You cannot close a KStream. But yet, you could stop KafkaStreams and reset the input topic offset of a KTable to re-process data.

I could also just have reprocessed the original record, but then I have limited time in which to retry before I hit the kafka producer timeout…

Not sure what you mean by this.

As a side-note, when does kafka determine the record is read successfully in a read-at-least-once scenario? Is it leveraged by the next attempt to read from the stream?

A read is successful after the corresponding offset commit happend.

I was referring to catching the exception that JPA throws, and starting a new JPA transaction and reprocessing, and having a retry count to prevent a never-ending cycle…

Using Kafka Streams, you cannot directly control transactions, but it’s managed for you. If the producer throws and exception, you can also not catch it (it won’t go through your user code).

Of course, if you use a plain producer, you can to whatever you want. If there is an error, you can call abortTransaction() and re-try.

@Test
  public void SendOffset_TwoProducerDuplicateTrxId_ThrowException() {
    // create two producer with same transactional id
    Producer producer1 = KafkaBuilder.buildProducer(trxId, servers);
    Producer producer2 = KafkaBuilder.buildProducer(trxId, servers);

    offsetMap.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1000));

    // initial and start two transactions
    sendOffsetBegin(producer1);
    sendOffsetBegin(producer2);

    try {
      // when commit first transaction it expected to throw exception
      sendOffsetEnd(producer1);

      // it expects not run here
      Assert.assertTrue(false);
    } catch (Throwable t) {
      // it expects to catch the exception
      Assert.assertTrue(t instanceof ProducerFencedException);
    }
  }

  private void sendOffsetBegin(Producer producer) {
    producer.initTransactions();
    producer.beginTransaction();
    producer.sendOffsetsToTransaction(offsetMap, consumerGroup);
  }

  private void sendOffsetEnd(Producer producer) {
    producer.commitTransaction();
  }

I write a Unit test to reproduce this, from this piece of Java code, I write the same java code function in javascript and add in my https://perfectessaywriting.com/ website javascript file, and the issue got solved. You can change this code syntax according to your need, it will surely solve your issue.

@shanamendez Thanks for helping out.

I don’t think it applied to @Grasp issue though, because he is using Kafka Streams, and thus does not control the producer, but the library does the client management internally.