Ktable - Integer value issue

Hi, I am trying to create a KTable from Kafka Topic on Confluent Cloud CLI. Kafka topic contains JSON data. If one of the JSON FIELD has INTEGER data, I can’t see any data resulting out of KTable created on that topic. KSQL_PROCESSING_LOG shows serialization exception : Size of data received by LongDeserializer is not 8.

{
  "MESSAGE": null,
  "DESERIALIZATIONERRORCAUSE": null,
  "DESERIALIZATIONERRORMESSAGE": null,
  "RECORDPROCESSINGERRORCAUSE": [
    "java.lang.NullPointerException"
  ],
  "RECORDPROCESSINGERRORRECORD": "[\"processing.transient_KSQL_PROCESSING_LOG_3216184106591902114.Project\",\"ERROR\",1637859004592,{\"TYPE\":1,\"DESERIALIZATIONERROR\":null,\"RECORDPROCESSINGERROR\":{\"ERRORMESSAGE\":\"Error computing expression MESSAGE->SERIALIZATIONERROR->ERRORMESSAGE for column SERIALIZATIONERRORERRORMESSAGE with index 8\",\"RECORD\":\"[\\\"processing.transient_KSQL_PROCESSING_LOG_3216184106591902114.Project\\\",\\\"ERROR\\\",1637859004511,{\\\"TYPE\\\":1,\\\"DESERIALIZATIONERROR\\\":null,\\\"RECORDPROCESSINGERROR\\\":{\\\"ERRORMESSAGE\\\":\\\"Error computing expression MESSAGE->SERIALIZATIONERROR->RECORD for column SERIALIZATIONERRORRECORD with index 7\\\",\\\"RECORD\\\":\\\"[\\\\\\\"processing.transient_KSQL_PROCESSING_LOG_3216184106591902114.Project\\\\\\\",\\\\\\\"ERROR\\\\\\\",1637859004462,{\\\\\\\"TYPE\\\\\\\":1,\\\\\\\"DESERIALIZATIONERROR\\\\\\\":null,\\\\\\\"RECORDPROCESSINGERROR\\\\\\\":{\\\\\\\"ERRORMESSAGE\\\\\\\":\\\\\\\"Error computing expression MESSAGE->KAFKASTREAMSTHREADERROR->CAUSE for column KAFKASTREAMSTHREADERRORCAUSE with index 10\\\\\\\",\\\\\\\"RECORD\\\\\\\":\\\\\\\"[\\\\\\\\\\\\\\\"processing.transient_USERS_631838209135659341.KsqlTopic.Source.deserializer\\\\\\\\\\\\\\\",\\\\\\\\\\\\\\\"ERROR\\\\\\\\\\\\\\\",1637859004413,{\\\\\\\\\\\\\\\"TYPE\\\\\\\\\\\\\\\":0,\\\\\\\\\\\\\\\"DESERIALIZATIONERROR\\\\\\\\\\\\\\\":{\\\\\\\\\\\\\\\"TARGET\\\\\\\\\\\\\\\":\\\\\\\\\\\\\\\"key\\\\\\\\\\\\\\\",\\\\\\\\\\\\\\\"ERRORMESSAGE\\\\\\\\\\\\\\\":\\\\\\\\\\\\\\\"Error deserializing KAFKA message from topic: new_topic\\\\\\\\\\\\\\\",\\\\\\\\\\\\\\\"RECORDB64\\\\\\\\\\\\\\\":\\\\\\\\\\\\\\\"MQ==\\\\\\\\\\\\\\\",\\\\\\\\\\\\\\\"CAUSE\\\\\\\\\\\\\\\":[\\\\\\\\\\\\\\\"Size of data received by LongDeserializer is not 8\\\\\\\\\\\\\\\"],\\\\\\\\\\\\\\\"topic\\\\\\\\\\\\\\\":\\\\\\\\\\\\\\\"new_topic\\\\\\\\\\\\\\\"},\\\\\\\\\\\\\\\"RECORDPROCESSINGERROR\\\\\\\\\\\\\\\":null,\\\\\\\\\\\\\\\"PRODUCTIONERROR\\\\\\\\\\\\\\\":null,\\\\\\\\\\\\\\\"SERIALIZATIONERROR\\\\\\\\\\\\\\\":null,\\\\\\\\\\\\\\\"KAFKASTREAMSTHREADERROR\\\\\\\\\\\\\\\":null},1637859004413]\\\\\\\",\\\\\\\"CAUSE\\\\\\\":[\\\\\\\"java.lang.NullPointerException\\\\\\\"]},\\\\\\\"PRODUCTIONERROR\\\\\\\":null,\\\\\\\"SERIALIZATIONERROR\\\\\\\":null,\\\\\\\"KAFKASTREAMSTHREADERROR\\\\\\\":null},1637859004462]\\\",\\\"CAUSE\\\":[\\\"java.lang.NullPointerException\\\"]},\\\"PRODUCTIONERROR\\\":null,\\\"SERIALIZATIONERROR\\\":null,\\\"KAFKASTREAMSTHREADERROR\\\":null},1637859004511]\",\"CAUSE\":[\"java.lang.NullPointerException\"]},\"PRODUCTIONERROR\":null,\"SERIALIZATIONERROR\":null,\"KAFKASTREAMSTHREADERROR\":null},1637859004592]",
  "RECORDPROCESSINGERRORERRORMESSAGE": "Error computing expression MESSAGE->SERIALIZATIONERROR->RECORD for column SERIALIZATIONERRORRECORD with index 7",
  "SERIALIZATIONERRORCAUSE": null,
  "SERIALIZATIONERRORRECORD": null,
  "SERIALIZATIONERRORERRORMESSAGE": null,
  "PRODUCTIONERRORERRORMESSAGE": null,
  "KAFKASTREAMSTHREADERRORCAUSE": null,
  "KAFKASTREAMSTHREADERRORTHREADNAME": null,
  "KAFKASTREAMSTHREADERRORERRORMESSAGE": null
}

where data is {"id":1, "name":"abc"}

JSON data in topic with no number field is working fine with KTable pull query.
I request you to help in this case.
@rmoff

Thanks.

Size of data received by LongDeserializer is not 8.

Seems you defined a BIGINT type, but the data is just a regular INT?

What CREATE TABLE statement did you use?

JSON data in topic with no number field is working fine with KTable pull query.

Not sure what you mean by this?

For this data in topic,

CREATE TABLE user_table (id BIGINT PRIMARY KEY, name VARCHAR)
  WITH (KAFKA_TOPIC='users_topic', VALUE_FORMAT=''JSON);

I tried using INT too, it’s giving exception for that too.

If I am defining primary key to STRING, that’s working for me.

Thanks.

As you show a single JSON, I assume that the data is stored in the value of the key-value-pair. (Not sure if you are aware that Kafka uses a key-value data model).

When you say id BIGINT PRIMARY KEY, you tell ksqlDB to read the “id” column from the key of the key-value pair though. Not sure if this is really what you want? Seems you want to read the “id” field from the JSON that seems to be store in the value? It’s not possible to use a value-fields as PRIMARY KEY though.

If the key in our topic is empty, you cannot read the data as TABLE, but you would need to pre-process the data to move the primary key column into the key in the topic first. For example, you could define a STREAM input (id BIGINT, name VARCHAR) and issue a query SELECT * FROM input PARTITION BY id;, and define the TABLE using the result topic of the query.

Thanks. Let me try it

This topic was automatically closed after 30 days. New replies are no longer allowed.