I have a kafka streams application that creates a client-side KTable and aggregates events from a topic until the end is reached.
The application is streaming that KTable and using a custom processor to commit the aggregated event to a relational database. However, the offsets are not moving forward, despite attempting to configure the stream for ONLY_ONCE processing.
I have tried committing the offset in the
process method of custom processor. This results in the following exception:
22:36:44.227 [kafka-producer-network-thread | sonata-ods-streams-c2c7a9c8-5dfc-467c-a113-4a45051b4c9c-StreamThread-1-0_0-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - stream-thread [sonata-ods-streams-c2c7a9c8-5dfc-467c-a113-4a45051b4c9c-StreamThread-1] task [0_0] Error encountered sending record to topic sonata-ods-streams-bravura.transactionSegment-repartition for task 0_0 due to: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out
My configuration for the POC is as follows:
final Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "ods-streams"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "server01:9092"); settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class); settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); settings.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://server01:8081"); settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 50 * 1024 * 1024L); // 10MB settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000); // 30s settings.put(StreamsConfig.POLL_MS_CONFIG, 400);
Can someone please help:
- why am I getting an epoch error? Is a producer always explicitly created? I assume this may be used to create a kafka transaction for once only processing…
- Why when I only have a single tread connecting to the KTable (although on a topic with multiple partitions) am I getting an epoch error?
- How to I correct this error? Do I need to obtain the producer and invoke a method on it, or change configuration?
- Should I be explicitly committing offsets in my custom processor? (
processorContext.commit()). I am hoping that a failure to commit will cause the aggregated event to be re-processed (to handle expected SQL exceptions), but a commit will move onto the next aggregated event.
Thanks in advance.