Hello –
Problem: how to define a table schema that describes the following kafka topic message such that a row in the ksqldb table has the given primary key and the row value is an array of structs, or something like that.
kafka topic whose message is a json array of objects: [{"a":1, "b":2,"c":3,"d":"some string"},{"a":1, "b":22,"c":33,"d":"some string"},{"a":1, "b":222,"c":333,"d":"some string"}, ...]
The topic has a key for partitioning, which in this case corresponds to the value of the “a” attribute in each object in a row of the array. Thus raw messages looks like this:
… and so forth. The topic and message structure are pre-existing and owned by another team so modifying the message structure is most likely not an option.
I’ve looked at explanations for single field schemas and wrapped vs. unwrapped values, but have not yet been able to define a ksqldb table that successfully supports a query of this topic and its messages.
Any suggestions or comments on a good approach to solving this problem would be greatly appreciated.
You should be able to define the table like this (not sure if I get the syntax right…):
CREATE TABLE t (
id INT PRIMARY KEY, -- you could use any different name to `id`, too
-- -> except `a` to avoid a naming conflict;
-- `PRIMARY KEY` tells ksqlDB to read this column from the key
a INT, -- if you omit `a`, you could also name the primary key column `a` though
b INT,
c INT,
d VARCHAR)
WITH (kafka_topic='...', format='JSON');
Thanks mjsax for your suggestion. Unfortunately your approach isn’t working due to the message being wrapped in an array. The error is:
org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: my_message_topic. Can’t convert type. sourceType: ArrayNode, requiredType: STRUCT<…>
That’s the kicker, that every json message on the topic is an array of objects. Worse still is that the array is anonymous, i.e. it is not part of a json key/value pair but simply a json value. I’ve attempted to define the message as an array of structs, which is what it is, but that fails syntactically as:
Caused by: line 3:10: extraneous input ‘<’
How to express the table schema based on an anonymous message with each message being a single row in the table seems to be the sticking point. Any suggestions here?
You mentioned that you already looked into single-field wrapping/unwrapping. Have you tried this?
CREATE TABLE t (
k INT PRIMARY KEY,
v ARRAY<STRUCT<a INT, b INT, c INT, d VARCHAR>>
) WITH (kafka_topic='...', format='JSON', wrap_single_value=false);
(where the key format may have to be adjusted based on @mjsax 's comment above)
Your suggestion @vxia essentially was the solution to my problem. (2) things:
setting the wrap_single_value property to false
defining the otherwise anonymous array message as a k/v pair
With this, coupled with what @mjsax pointed out in defining the primary key, I was able to come up with a proper schema to begin querying the topic. Note that the partitioning key in the raw message although looking like an integer actually needed to be defined as a string. So my final schema def looks like this:
create table t (
k string primary key,
v array<struct<...>>
) with (
kafka_topic='...', key_format='KAFKA', value_format='JSON', wrap_single_value=false
)
Very nice! Thanks to all the guidance and suggestions.
If your key was serialized upstream as a string representation of a numeric value, you might be able to deserialize it as an INT in ksqlDB by using KEY_FORMAT='JSON'. For example:
create table t (
k int primary key,
v array<struct<...>>
) with (
kafka_topic='...', format='JSON', wrap_single_value=false
);