Build temp table which can save the latest events in ksqldb

HI there:
We are building an azure eventhub connector and get nested json body. Now, we want to flatten the messages through KSQLDB. Since the events keep coming, we want updates the latest status of the event. Thus, we need a build a temp table to save the latest events while another table to hold the previous events( ever time, when the new event come, the tmp table should be cleared). Since KSQLDB not provide delete operation, we use max (rowtime) to update the latest events. One question is that: when one record exploded to several records, they should share the same timestamp right? If another events also come and is being exploded , is that affect?
or is there any better way to build a temp table ?

For example:

CREATE STREAM STEP_A (
  id VARCHAR KEY,
  firstName VARCHAR,
  listA ARRAY<STRUCT<id VARCHAR, data VARCHAR>>,
  listB ARRAY<STRUCT<id VARCHAR, numbers BIGINT>>
) WITH (KAFKA_TOPIC = 'step_a',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1);

first event:

INSERT INTO step_a(id, firstName, listA, listB) VALUES (
  'a1', 'apple',
  ARRAY[
    STRUCT(id := 'b', data := 'something'),
    STRUCT(id := 'a', data := 'blah')
  ],
  ARRAY[
    STRUCT(id := 'x', numbers := 3),
    STRUCT(id := 'y', numbers := 4)
  ]
);

second event:

INSERT INTO step_a(id, firstName, listA, listB) VALUES (
  'a1', 'apple',
  ARRAY[
    STRUCT(id := 'b', data := 'something')
  ],
  ARRAY[
    STRUCT(id := 'x', numbers := 3),
    STRUCT(id := 'y', numbers := 4),
    STRUCT(id := 'z', numbers := 5)
  ]
);

the steps to explode the array and build the new key:

CREATE STREAM step_a_1 AS SELECT ID,FIRSTNAME, EXPLODE(LISTA) AS A,LISTB AS B FROM STEP_A emit changes ;

CREATE STREAM step_a_3 AS SELECT ID,FIRSTNAME, A ,explode(B) as  B FROM STEP_a_2 emit changes ;

CREATE STREAM step_a_4 AS SELECT ID+'_'+A->ID+'_'+b->ID  as uniqueid,ID,FIRSTNAME, A ,B  FROM STEP_a_3 emit changes;

create temp table:

create table  step_b1 as select ID, max(ROWTIME) as time  from step_a_4 group by ID emit changes;

create table step_b2 as select LATEST_BY_OFFSET(a.ID) as ID, UNIQUEID  from step_a_4 a inner  join step_b1 b on a.ID = b.ID group by a.UNIQUEID   emit changes;

Thanks a lot :slight_smile:

This topic was automatically closed after 30 days. New replies are no longer allowed.