I create a stream with the following ksqlDB Statement:
CREATE STREAM my_stream(
timestamp string,
PAYLOAD STRUCT<
name string,
unixTimestamp string,
value string,
unit string,
metricType string>) WITH (KAFKA_TOPIC='my-topic', VALUE_FORMAT='JSON')
There will be published messages by an external system to the Kafka topic my-topic. I can see the messages by the Confluent Control Center. But the messages will be not streamed into the stream my_stream. When I publish a message to the my-topic topic manually by the control center UI (with the same content like the messages before), I can see the message in the stream.
Now I saw in the ksqldb logs die following error message:
,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_MES_TEST_8810051620230793626.KsqlTopic.Source.deserializer)
[2022-10-28 07:36:45,890] WARN stream-thread [_confluent-ksql-default_transient_transient_MES_TEST_8810051620230793626_1666942430127-e648e220-8caa-4364-83b6-d83ecd05e4a7-StreamThread-3] task [0_2] Skipping
record due to deserialization error. topic=[mes_test-topic] partition=[2] offset=[5] (org.apache.kafka.streams.processor.internals.RecordDeserializer)
org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: mes_test-topic. Invalid UTF-32 character 0x17a2274 (above 0x0010ffff) at char #1, byte #7)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:145)
at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:225)
at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:204)
at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)
at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:303)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1000)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:914)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x17a2274 (above 0x0010ffff) at char #1, byte #7)
It only occurs when there will be published messages to kafka by the texernal system. But I donĀ“t understand the reason for the serializer exception.