Stream does not receive messages from topic

Hi Community,

I create a stream with the following ksqlDB Statement:

CREATE STREAM my_stream(
  timestamp string,
  PAYLOAD STRUCT<
  name string,
  unixTimestamp string,
  value string,
  unit string,
  metricType string>) WITH (KAFKA_TOPIC='my-topic', VALUE_FORMAT='JSON')

There will be published messages by an external system to the Kafka topic my-topic. I can see the messages by the Confluent Control Center. But the messages will be not streamed into the stream my_stream. When I publish a message to the my-topic topic manually by the control center UI (with the same content like the messages before), I can see the message in the stream.

Could somebody explain this behavior?

Thank you in advance!

BR
Simon

Now I saw in the ksqldb logs die following error message:

,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_MES_TEST_8810051620230793626.KsqlTopic.Source.deserializer)
[2022-10-28 07:36:45,890] WARN stream-thread [_confluent-ksql-default_transient_transient_MES_TEST_8810051620230793626_1666942430127-e648e220-8caa-4364-83b6-d83ecd05e4a7-StreamThread-3] task [0_2] Skipping 
record due to deserialization error. topic=[mes_test-topic] partition=[2] offset=[5] (org.apache.kafka.streams.processor.internals.RecordDeserializer)
org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: mes_test-topic. Invalid UTF-32 character 0x17a2274 (above 0x0010ffff) at char #1, byte #7)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:145)
        at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:225)
        at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:204)
        at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:303)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1000)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:914)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x17a2274 (above 0x0010ffff) at char #1, byte #7)

It only occurs when there will be published messages to kafka by the texernal system. But I donĀ“t understand the reason for the serializer exception.

Does anybody have an idea?

BR
Simon

This topic was automatically closed after 30 days. New replies are no longer allowed.