How to define ksqldb table for this anonomous json message

Hello –
Problem: how to define a table schema that describes the following kafka topic message such that a row in the ksqldb table has the given primary key and the row value is an array of structs, or something like that.

  • kafka topic whose message is a json array of objects:
    [{"a":1, "b":2,"c":3,"d":"some string"},{"a":1, "b":22,"c":33,"d":"some string"},{"a":1, "b":222,"c":333,"d":"some string"}, ...]

  • The topic has a key for partitioning, which in this case corresponds to the value of the “a” attribute in each object in a row of the array. Thus raw messages looks like this:

CreateTime:1623311232315        1   [{"a":1, "b":2,"c":3,"d":"some string"},{"a":1, "b":22,"c":33,"d":"some string"},{"a":1, "b":222,"c":333,"d":"some string"}, ...]
CreateTime:1623311233010        2   [{"a":2, "b":22,"c":33,"d":"some string"},{"a":2, "b":222,"c":333,"d":"some string"},{"a":2, "b":2222,"c":3333,"d":"some string"}, ...]

… and so forth. The topic and message structure are pre-existing and owned by another team so modifying the message structure is most likely not an option.

I’ve looked at explanations for single field schemas and wrapped vs. unwrapped values, but have not yet been able to define a ksqldb table that successfully supports a query of this topic and its messages.

Any suggestions or comments on a good approach to solving this problem would be greatly appreciated.

Thank you

You should be able to define the table like this (not sure if I get the syntax right…):

CREATE TABLE t (
    id INT PRIMARY KEY, -- you could use any different name to `id`, too
                        --  -> except `a` to avoid a naming conflict;
                        -- `PRIMARY KEY` tells ksqlDB to read this column from the key
    a INT, -- if you omit `a`, you could also name the primary key column `a` though
    b INT,
    c INT,
    d VARCHAR)
WITH (kafka_topic='...', format='JSON');

Assuming that the key is a JSON (ie, string). Cf. ksqlDB Serialization Formats - ksqlDB Documentation

If the key is a 4-byte integer, you can tell ksqlDB about it using WITH(..., key_format='KAFKA', value_format='JSON'); instead.

Thanks mjsax for your suggestion. Unfortunately your approach isn’t working due to the message being wrapped in an array. The error is:

org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: my_message_topic. Can’t convert type. sourceType: ArrayNode, requiredType: STRUCT<…>

That’s the kicker, that every json message on the topic is an array of objects. Worse still is that the array is anonymous, i.e. it is not part of a json key/value pair but simply a json value. I’ve attempted to define the message as an array of structs, which is what it is, but that fails syntactically as:

Caused by: line 3:10: extraneous input ‘<’

How to express the table schema based on an anonymous message with each message being a single row in the table seems to be the sticking point. Any suggestions here?

Thank you again

Hi @bruiser ,

You mentioned that you already looked into single-field wrapping/unwrapping. Have you tried this?

CREATE TABLE t (
	k INT PRIMARY KEY, 
	v ARRAY<STRUCT<a INT, b INT, c INT, d VARCHAR>>
) WITH (kafka_topic='...', format='JSON', wrap_single_value=false);

(where the key format may have to be adjusted based on @mjsax 's comment above)

Your suggestion @vxia essentially was the solution to my problem. (2) things:

  • setting the wrap_single_value property to false
  • defining the otherwise anonymous array message as a k/v pair

With this, coupled with what @mjsax pointed out in defining the primary key, I was able to come up with a proper schema to begin querying the topic. Note that the partitioning key in the raw message although looking like an integer actually needed to be defined as a string. So my final schema def looks like this:

create table t (
  k string primary key,
  v array<struct<...>>
) with (
  kafka_topic='...', key_format='KAFKA', value_format='JSON', wrap_single_value=false
)

Very nice! Thanks to all the guidance and suggestions.

-bruiser-

1 Like

If your key was serialized upstream as a string representation of a numeric value, you might be able to deserialize it as an INT in ksqlDB by using KEY_FORMAT='JSON'. For example:

create table t (
  k int primary key,
  v array<struct<...>>
) with (
  kafka_topic='...', format='JSON', wrap_single_value=false
);
2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.