Aggregate into a nested JSON

Hello! I’m trying to put together a POC of ksqldb for my colleagues but I seem to have run into a hurdle. If I can overcome this hurdle then it shows ksqldb can do all we need.

I have some simple normalised streams.

create stream orders (
  id varchar,
  customerId varchar
) WITH (
  kafka_topic='orders',
  value_format='JSON'
);
create stream lines (
  id varchar,
  orderId varchar,
  productId varchar,
  quantity varchar
) WITH (
  kafka_topic='lines',
  value_format='JSON'
);

The lines schema has a foreign key for the order primary key. So I have produced these sample events.

orders topic:

{"id":"o12", "customerId": "c54"}

lines topic:

{"id": "l24", "orderId": "o12", "productId": "p20", "quantity": "4"}
{"id": "l25", "orderId": "o12", "productId": "p92", "quantity": "1"}

My goal is to create a stream that produces events with this value structure:

{
  "id":"o12",
  "customerId": "c54",
  "lines": [
    {"id": "l24","productId": "p20", "quantity": "4"},
    {"id": "l25", "productId": "p92", "quantity": "1"}
  ]
}

It seems I’d need to do this in two stages? Firstly aggregate the lines with the same orderId, then enrich the orders stream with the aggregated lines. My hurdle is with the aggregation however. I’ve seen a couple of example online of people aggregating structs with the collect_list function. This post for example. However when I try it ksqdb exclaims:

Function 'collect_list' does not accept parameters (STRUCT<ID STRING, PRODUCTID STRING, QUANTITY STRING>)

This is the query I tried:

select
  orderId,
  collect_list (struct(
    id := lines.id,
    productId := lines.productId,
    quantity := lines.quantity
  )) as lines
from lines
  group by orderId
emit changes;

I’ve tried many different experiments. The closest thing I got was to cast the struct to a varchar, but that doesn’t produce anything desirable:

{"LINES": [
  "Struct{ID=l24,PRODUCTID=p20,QUANTITY=4}",
  "Struct{ID=l25,PRODUCTID=p92,QUANTITY=1}"
]}

Is what I’m trying to achieve possible using another method?

As pointed out in the docs, collect_list currently only works on primitive types (ksqlDB Aggregate Functions - ksqlDB Documentation)

Currently only works for simple types (not Map, Array, or Struct).

It should be possible to convert the VARCHAR type into a proper STRUCT as a pre-processing step using lambdas (and maybe a UDF if necessary) in the projection clause though.

Thank you for the comment. I’ve been experimenting with a UDF to convert the casted STRUCT VARCHAR into a JSON string (json_string). This is a step closer but not ideal as I want JSON in my events, not JSON strings.

select
  orderId as "orderId",
  collect_list (json_string(cast(struct(
    "id" := lines.id,
    "productId" := lines.productId,
    "quantity" := lines.quantity
  ) as varchar))) as "lines"
from lines
  group by orderId
emit changes;

which produces

{
  "id":"o12",
  "lines": [
    "{\"id\":\"l24\",\"productId\":\"p20\",\"quantity\":\"4\"}",
    "{\"id\":\"l25\",\"productId\":\"p92\",\"quantity\":\"1\"}"
  ]
}

If I understood you correctly, I’ll try and wrap the casted VARCHAR STRUCT into a STRUCT after collect_list has processed. I’ll give a lambda a go first, hoping there’s a way to cast from VARCHAR to STRUCT easily :smiley: