LogAndContinueExceptionHandler not working

The streams application shuts down after receiving the below de-SerializationException instead of logging and continue

2023-12-14 08:01:34,093 IST ERROR [sequencer_traps-acacd3ca-10c2-469a-9fcf-975e6d6a9c36-StreamThread-1] StreamThread run - stream-thread [sequencer_traps-acacd3ca-10c2-469a-9fcf-975e6d6a9c36-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: failed to initialize processor SequenceVerifier
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:97) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:545) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:271) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.transitionToRunning(AssignedTasks.java:202) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.updateRestored(AssignedStreamsTasks.java:104) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:335) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:889) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) ~[fm-sequencer-1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) [fm-sequencer-1.0.jar:?]

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Invalid negative byte ffffff8c at end of VInt
Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid negative byte ffffff8c at end of VInt
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477) ~[fm-sequencer-1.0.jar:?]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:755) ~[fm-sequencer-1.0.jar:?]
at com.fasterxml.jackson.dataformat.avro.deser.JacksonAvroParserImpl._reportInvalidNegative(JacksonAvroParserImpl.java:1122) ~[fm-sequencer-1.0.jar:?]
at com.fasterxml.jackson.dataformat.avro.deser.JacksonAvroParserImpl.decodeInt(JacksonAvroParserImpl.java:252) ~[fm-sequencer-1.0.jar:?]
at com.fasterxml.jackson.dataformat.avro.deser.JacksonAvroParserImpl.decodeString(JacksonAvroParserImpl.java:571) ~[fm-sequencer-1.0.jar:?]
at com.fasterxml.jackson.dataformat.avro.deser.JacksonAvroParserImpl.decodeStringToken(JacksonAvroParserImpl.java:565) ~[fm-sequencer-1.0.jar:?]

From the stack trace, it seems the issue happen during initialization.

The de-serialization exception handler only applies to input record deserialization, but not to other places.

Can it be that some issue happens inside Processor#init(), ie, your own code? For this case, you would need to add your own exception handling to init().

Thanks for the reply…

This exception comes while initializing(restoring) the states store.
@mjsax I am currently using kafka-streams 2.3.1 library

If you look into the code, the exception comes from here: kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java at da7a8db29e2073e3bc50f5a8192119af97ef7115 · apache/kafka · GitHub

KafkaStreams itself does not use jackson, so I assume the original exception is coming from kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java at da7a8db29e2073e3bc50f5a8192119af97ef7115 · apache/kafka · GitHub which calls your user code.

This exception comes while initializing(restoring) the states store.

A Topology is initialized after restoration finished (also indicated by the stack trace AssignedTasks.transitionToRunning.

Thus, I believe your implementation of https://github.com/apache/kafka/blob/da7a8db29e2073e3bc50f5a8192119af97ef7115/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java#L43 throw the JsonParseException ?

Thanks @mjsax

As part of init() method I am iterating over the values in the states store and exception is coming during that time

public void init(ProcessorContext context) {
log.debug(“Initializing SequenceVerifier with task id {}”, context.taskId());
this.context = context;
this.stateStore = (KeyValueStore<String, ActiveEvents>) context.getStateStore(eventStoreName);
KeyValueIterator<String, ActiveEvents> all = stateStore.all();
all.forEachRemaining(entry → {
if (entry.value.isScheduled()) {
previouslyScheduled.add(entry.key);
}
});
all.close(); // close to avoid leaks
this.eventStoreProxy = new StoreProxy<>(stateStore);
log.debug(“Completed initialization of SequenceVerifier with task id {}”, context.taskId());
}

is there a way I can remove the key, value from the states store when de-serialization exception comes in the init() method while iterating the state store?

is there a way I can remove the key, value from the states store when de-serialization exception comes in the init() method while iterating the state store?

That’s a little tricky… you can sure sure catch the exception and swallow it, to avoid crashing…

For cleaning it up, it might be required to put some workarounds in place – if you change the store key/value types to <byte[]/byte[]> you could side-step the deserialization, and try to deserialize “manually” in your use code, ie, create the deserializer explicitly in your own code and call deserialize(...) yourself – and if it fails, you can call delete() passing in the key as byte[].