Hi all,
working with ksqldb 0.17.0,
We’re asking how we could achieve aggregation of lines with nested ARRAY AND STRUCT.
An example below :
CREATE STREAM STREAM1 (
NUMBER VARCHAR KEY,
TIME VARCHAR,
PRIORITY VARCHAR
) WITH (
kafka_topic = 'stream1',
partitions = 1,
value_format = 'avro'
);
DESCRIBE STREAM1;
Name : STREAM1
Field | Type
------------------------------------------------
NUMBER | VARCHAR(STRING) (key)
TIME | VARCHAR(STRING)
PRIORITY | VARCHAR(STRING)
INSERT INTO STREAM1 (NUMBER, TIME, PRIORITY) VALUES ('NUMBER1', '24-09-2020 17:29:57', 'High');
INSERT INTO STREAM1 (NUMBER, TIME, PRIORITY) VALUES ('NUMBER2', '25-10-2021 18:30:58 ', 'Low');
SELECT * FROM STREAM1 EMIT CHANGES;
+------------------------------------------------------+------------------------------------------------------+------------------------------------------------------+
|NUMBER |TIME |PRIORITE |
+------------------------------------------------------+------------------------------------------------------+------------------------------------------------------+
|NUMBER1 |24-09-2020 17:29:57 |High |
|NUMBER2 |25-10-2021 18:30:58 |Low |
SELECT NUMBER,
ARRAY[
STRUCT(columnId := 0, text := NUMBER),
STRUCT(columnId := 1, text := TIME ),
STRUCT(columnId := 2, text := PRIORITY )
] AS dataArray FROM STREAM1 EMIT CHANGES;
The DATAARRAY produced for each line looks like :
[{COLUMNID=0, TEXT=NUMBER1}, {COLUMNID=1, TEXT=24-09-2020 17:29:57}, {COLUMNID=2, TEXT=High}]
Would it be possible to aggregate the content of all lines in an additional ARRAY, based on some defined criteria (number of unread lines from topic, windowing frames, …)
to obtain this final structure (ARRAY<ARRAY<STRUCT<columnID INT, text VARCHAR>>> ) ?
[
[{COLUMNID=0, TEXT=NUMBER1}, {COLUMNID=1, TEXT=24-09-2020 17:29:57}, {COLUMNID=2, TEXT=High}],
[{COLUMNID=0, TEXT=NUMBER2}, {COLUMNID=1, TEXT=25-10-2021 18:30:58}, {COLUMNID=2, TEXT=Low}]
]
Unfortunately, the COLLECT_LIST function only works with basic types.
Regards,