Hello! I’m trying to put together a POC of ksqldb for my colleagues but I seem to have run into a hurdle. If I can overcome this hurdle then it shows ksqldb can do all we need.
I have some simple normalised streams.
create stream orders (
id varchar,
customerId varchar
) WITH (
kafka_topic='orders',
value_format='JSON'
);
create stream lines (
id varchar,
orderId varchar,
productId varchar,
quantity varchar
) WITH (
kafka_topic='lines',
value_format='JSON'
);
The lines schema has a foreign key for the order primary key. So I have produced these sample events.
orders topic:
{"id":"o12", "customerId": "c54"}
lines topic:
{"id": "l24", "orderId": "o12", "productId": "p20", "quantity": "4"}
{"id": "l25", "orderId": "o12", "productId": "p92", "quantity": "1"}
My goal is to create a stream that produces events with this value structure:
{
"id":"o12",
"customerId": "c54",
"lines": [
{"id": "l24","productId": "p20", "quantity": "4"},
{"id": "l25", "productId": "p92", "quantity": "1"}
]
}
It seems I’d need to do this in two stages? Firstly aggregate the lines with the same orderId, then enrich the orders stream with the aggregated lines. My hurdle is with the aggregation however. I’ve seen a couple of example online of people aggregating structs with the collect_list function. This post for example. However when I try it ksqdb exclaims:
Function 'collect_list' does not accept parameters (STRUCT<ID STRING, PRODUCTID STRING, QUANTITY STRING>)
This is the query I tried:
select
orderId,
collect_list (struct(
id := lines.id,
productId := lines.productId,
quantity := lines.quantity
)) as lines
from lines
group by orderId
emit changes;
I’ve tried many different experiments. The closest thing I got was to cast the struct to a varchar, but that doesn’t produce anything desirable:
{"LINES": [
"Struct{ID=l24,PRODUCTID=p20,QUANTITY=4}",
"Struct{ID=l25,PRODUCTID=p92,QUANTITY=1}"
]}
Is what I’m trying to achieve possible using another method?