Hi there:
We are now doing an experiment using ksqlDB. We have a stream to hold all the data and build a ktable to get the latest record with Max(rowtime). Then, we want to join the stream and the ktable to get the latest results. However, when a new record come to kstream, the ktable did changed while the join results show both previous results and current results. Can anyone help us with that? Thanks a lot
The script we use for stream:
CREATE STREAM STEP_A_1 (
id VARCHAR KEY,
firstName VARCHAR,
listA ARRAY<STRUCT<id VARCHAR, data VARCHAR>>,
listB ARRAY<STRUCT<id VARCHAR, numbers BIGINT>>
) WITH (KAFKA_TOPIC = ‘STEP_A_1’,
VALUE_FORMAT = ‘JSON’,
PARTITIONS = 1);
CREATE STREAM step_a_2 AS SELECT ID,FIRSTNAME, EXPLODE(LISTA) AS A,LISTB AS B FROM step_a_1 emit changes ;
CREATE STREAM step_a_3 AS SELECT ID,FIRSTNAME, A ,explode(B) as B FROM STEP_a_2 emit changes ;
CREATE STREAM step_a_4 AS SELECT ID+‘’+A->ID+‘’+b->ID as uniqueid,ID,FIRSTNAME, A ,B FROM STEP_a_3 emit changes;
max(rowtime) for ktable:
create table step_b1 as select ID, max(ROWTIME) as time from step_a_4 group by ID emit changes;
get the results :
select a.UNIQUEID, a.FIRSTNAME, a.ID, a.rowTime, b.time from step_a_4 a inner join step_b1 b on a.ID = b.ID where a.ROWTIME = b.time EMIT CHANGES;
insert two records:
INSERT INTO step_a_1 (id, firstName, listA, listB) VALUES (
‘a1’, ‘apple’,
ARRAY[
STRUCT(id := ‘b’, data := ‘something’),
STRUCT(id := ‘a’, data := ‘blah’)
],
ARRAY[
STRUCT(id := ‘x’, numbers := 3),
STRUCT(id := ‘y’, numbers := 4)
]
);
INSERT INTO step_a_1 (id, firstName, listA, listB) VALUES (
‘a1’, ‘apple’,
ARRAY[
STRUCT(id := ‘b’, data := ‘something’)
],
ARRAY[
STRUCT(id := ‘x’, numbers := 3),
STRUCT(id := ‘y’, numbers := 4),
STRUCT(id := ‘z’, numbers := 5)
]
);
the results shows:
hope someone can help us with that. Thanks a lot