Aggregation for complex structures

Hi all,
working with ksqldb 0.17.0,
We’re asking how we could achieve aggregation of lines with nested ARRAY AND STRUCT.
An example below :

CREATE STREAM STREAM1 (
  NUMBER VARCHAR KEY,
  TIME VARCHAR,
  PRIORITY VARCHAR
) WITH (
  kafka_topic = 'stream1',
  partitions = 1,
  value_format = 'avro'
);
DESCRIBE STREAM1;

Name                 : STREAM1
 Field                 | Type
------------------------------------------------
 NUMBER                | VARCHAR(STRING)  (key)
 TIME                  | VARCHAR(STRING)
 PRIORITY              | VARCHAR(STRING)

INSERT INTO STREAM1 (NUMBER, TIME, PRIORITY) VALUES ('NUMBER1', '24-09-2020 17:29:57', 'High');
INSERT INTO STREAM1 (NUMBER, TIME, PRIORITY) VALUES ('NUMBER2', '25-10-2021 18:30:58 ', 'Low');

SELECT * FROM STREAM1 EMIT CHANGES;

+------------------------------------------------------+------------------------------------------------------+------------------------------------------------------+
|NUMBER                                                |TIME                                                  |PRIORITE                                              |
+------------------------------------------------------+------------------------------------------------------+------------------------------------------------------+
|NUMBER1                                               |24-09-2020 17:29:57                                   |High                                                  |
|NUMBER2                                               |25-10-2021 18:30:58                                   |Low                                                   |


SELECT NUMBER, 
 ARRAY[
   STRUCT(columnId := 0, text := NUMBER), 
   STRUCT(columnId := 1, text := TIME ), 
   STRUCT(columnId := 2, text := PRIORITY ) 
] AS dataArray FROM STREAM1 EMIT CHANGES;

The DATAARRAY produced for each line looks like :

[{COLUMNID=0, TEXT=NUMBER1}, {COLUMNID=1, TEXT=24-09-2020 17:29:57}, {COLUMNID=2, TEXT=High}]

Would it be possible to aggregate the content of all lines in an additional ARRAY, based on some defined criteria (number of unread lines from topic, windowing frames, …)
to obtain this final structure (ARRAY<ARRAY<STRUCT<columnID INT, text VARCHAR>>> ) ?

[
[{COLUMNID=0, TEXT=NUMBER1}, {COLUMNID=1, TEXT=24-09-2020 17:29:57}, {COLUMNID=2, TEXT=High}], 
[{COLUMNID=0, TEXT=NUMBER2}, {COLUMNID=1, TEXT=25-10-2021 18:30:58}, {COLUMNID=2, TEXT=Low}]
]

Unfortunately, the COLLECT_LIST function only works with basic types.

Regards,

1 Like

Hi Daniel,

I wonder if ksqlDB’s Lambda functions might do what you need. Take a look at this guide: How to transform columns with structured data. - ksqlDB Documentation

If you’re already tried that and it’s not what you need, post back and we’ll keep the conversation going.

Thanks,
Dave

Thanks Dave,
I’ve looked to lambdas, but for the moment couldn’t achieve simply what I wanted to do, since lambdas work on complex structures on each line, while I’d need to aggregate data from the same column (pretty much like COLLECT_LIST works)
Regards,

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.