Kafka Stream producer exception handling

We are using java spring kafka stream which received message from kafka topic TOPIC_1 and performed some transformation. The transformed message is sent to another kafka topic (TOPIC_2), which received AVRO message. If there is an AVRO serization error while publishing to the topic (TOPIC_2), the offset is never commited and it run in infinite loop. We have added StreamBuilderFactory.setStreamUnCaughtExceptionHandler, StreamBuilderFactory.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG but did not work.

Serialization exception Scenario:
AVRO message
name: price
type : [“null”, {“type”: “bytes”, “logicalType”: “decimal”, “precision”: 5, “scale”:2}]

If we sent price with value 989.9221 to kafka topic (TOPIC_2), it will throw error while publishing to kafka topic, which result in not committing the offset.

Let us know what will be the best way to handle this kind of error.

As you correctly observed, the ProductionExceptionHandler does not cover serialization errors. This will be fixed in upcoming Apache Kafka 3.5 release. (cf KIP-399)

For now (ie, before 3.5 release), the only way to address it in your code would be, to add a flatMap() before calling to() and do the serialization inside flatMap() manually, ie, flatMap() would take <K,V> in and emit <byte[],byte[]> – the to() Serde would be the BytesSerde. Inside the flatMap() function, you can add a try-catch and if an error occurs, let flatMap emit zero records instead of the serialized one.

2 Likes