Hi All ,
We’ve encountered the following errors in our streams applications, it occurs when we’re processing events and trying to fetch them from our kafka state stores. This typically occurs when there’s been a reboot of the pod or a streams rebalancing.
{"@timestamp":"2021-03-25T12:54:55.204+00:00","@version":"1","message":"stream-thread [test-aggregate-bar-streams-AggregateStream-6bc8a31b-5e3d-485d-a8f3-a4159d452e05-StreamThread-2] Failed to process stream task 0_5 due to the following error:","logger_name":"org.apache.kafka.streams.processor.internals.TaskManager","thread_name":"test-aggregate-bar-streams-AggregateStream-6bc8a31b-5e3d-485d-a8f3-a4159d452e05-StreamThread-2","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_5, processor=engagements-source, topic=engagement, partition=5, offset=17956, stacktrace=org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 96361
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 96361 not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:707)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:689)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:225)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:299)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:284)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:291)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:280)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:328)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:305)
at com.company.foo.bar.etl.streams.transformer.RecordTransformer.transform(Transformer.java:147)
at com.company.foo.bar.etl.streams.transformer.RecordTransformer.transform(Transformer.java:37)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:720)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 96361
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 96361 not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:707)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:689)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:225)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:299)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:284)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:291)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:280)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:328)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:305)
at com.company.foo.bar.etl.streams.transformer.RecordTransformer.transform(Transformer.java:147)
at com.company.foo.bar.etl.streams.transformer.RecordTransformer.transform(Transformer.java:37)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
"}
When this exception is handled state store operations continue as normal, but the records we try to read and trigger the exception are essentially lost.
The schema ID looks to be very high also, If I’m correct in saying this should be an incremental value. ‘‘Error retrieving Avro unknown schema for id 96361’’
Any help would be appreciated
Declan