HI there:
We are building an azure eventhub connector and get nested json body. Now, we want to flatten the messages through KSQLDB. Since the events keep coming, we want updates the latest status of the event. Thus, we need a build a temp table to save the latest events while another table to hold the previous events( ever time, when the new event come, the tmp table should be cleared). Since KSQLDB not provide delete operation, we use max (rowtime) to update the latest events. One question is that: when one record exploded to several records, they should share the same timestamp right? If another events also come and is being exploded , is that affect?
or is there any better way to build a temp table ?
For example:
CREATE STREAM STEP_A (
id VARCHAR KEY,
firstName VARCHAR,
listA ARRAY<STRUCT<id VARCHAR, data VARCHAR>>,
listB ARRAY<STRUCT<id VARCHAR, numbers BIGINT>>
) WITH (KAFKA_TOPIC = 'step_a',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1);
first event:
INSERT INTO step_a(id, firstName, listA, listB) VALUES (
'a1', 'apple',
ARRAY[
STRUCT(id := 'b', data := 'something'),
STRUCT(id := 'a', data := 'blah')
],
ARRAY[
STRUCT(id := 'x', numbers := 3),
STRUCT(id := 'y', numbers := 4)
]
);
second event:
INSERT INTO step_a(id, firstName, listA, listB) VALUES (
'a1', 'apple',
ARRAY[
STRUCT(id := 'b', data := 'something')
],
ARRAY[
STRUCT(id := 'x', numbers := 3),
STRUCT(id := 'y', numbers := 4),
STRUCT(id := 'z', numbers := 5)
]
);
the steps to explode the array and build the new key:
CREATE STREAM step_a_1 AS SELECT ID,FIRSTNAME, EXPLODE(LISTA) AS A,LISTB AS B FROM STEP_A emit changes ;
CREATE STREAM step_a_3 AS SELECT ID,FIRSTNAME, A ,explode(B) as B FROM STEP_a_2 emit changes ;
CREATE STREAM step_a_4 AS SELECT ID+'_'+A->ID+'_'+b->ID as uniqueid,ID,FIRSTNAME, A ,B FROM STEP_a_3 emit changes;
create temp table:
create table step_b1 as select ID, max(ROWTIME) as time from step_a_4 group by ID emit changes;
create table step_b2 as select LATEST_BY_OFFSET(a.ID) as ID, UNIQUEID from step_a_4 a inner join step_b1 b on a.ID = b.ID group by a.UNIQUEID emit changes;
Thanks a lot