The results is incorrect when a stream join a ktable

Hi there:
We are now doing an experiment using ksqlDB. We have a stream to hold all the data and build a ktable to get the latest record with Max(rowtime). Then, we want to join the stream and the ktable to get the latest results. However, when a new record come to kstream, the ktable did changed while the join results show both previous results and current results. Can anyone help us with that? Thanks a lot

The script we use for stream:
CREATE STREAM STEP_A_1 (
id VARCHAR KEY,
firstName VARCHAR,
listA ARRAY<STRUCT<id VARCHAR, data VARCHAR>>,
listB ARRAY<STRUCT<id VARCHAR, numbers BIGINT>>
) WITH (KAFKA_TOPIC = ‘STEP_A_1’,
VALUE_FORMAT = ‘JSON’,
PARTITIONS = 1);

CREATE STREAM step_a_2 AS SELECT ID,FIRSTNAME, EXPLODE(LISTA) AS A,LISTB AS B FROM step_a_1 emit changes ;

CREATE STREAM step_a_3 AS SELECT ID,FIRSTNAME, A ,explode(B) as B FROM STEP_a_2 emit changes ;

CREATE STREAM step_a_4 AS SELECT ID+‘’+A->ID+‘’+b->ID as uniqueid,ID,FIRSTNAME, A ,B FROM STEP_a_3 emit changes;

max(rowtime) for ktable:
create table step_b1 as select ID, max(ROWTIME) as time from step_a_4 group by ID emit changes;

get the results :
select a.UNIQUEID, a.FIRSTNAME, a.ID, a.rowTime, b.time from step_a_4 a inner join step_b1 b on a.ID = b.ID where a.ROWTIME = b.time EMIT CHANGES;

insert two records:
INSERT INTO step_a_1 (id, firstName, listA, listB) VALUES (
‘a1’, ‘apple’,
ARRAY[
STRUCT(id := ‘b’, data := ‘something’),
STRUCT(id := ‘a’, data := ‘blah’)
],
ARRAY[
STRUCT(id := ‘x’, numbers := 3),
STRUCT(id := ‘y’, numbers := 4)
]
);

INSERT INTO step_a_1 (id, firstName, listA, listB) VALUES (
‘a1’, ‘apple’,
ARRAY[
STRUCT(id := ‘b’, data := ‘something’)
],
ARRAY[
STRUCT(id := ‘x’, numbers := 3),
STRUCT(id := ‘y’, numbers := 4),
STRUCT(id := ‘z’, numbers := 5)
]
);

the results shows:

hope someone can help us with that. Thanks a lot :slight_smile:

I cannot fully follow your example. STREAM STEP_x1 is defined by never used. STREAM STEP_A from the first query is never defined (should this be STEP_x1 and it’s just a typo?). Similar STEP_a_2 is never defined?

build a ktable to get the latest record with Max(rowtime)

Your table query computes the max(rowtime) but it does mean you get the “latest record”. That is no how SQL works.

insert two records:

You INSERT INTO t_1 that is never defined. Also, you insert two record into t_1 so the join result should be empty (if this is all the data you have, because the other side of the join would not have any data)?

Also, in general joins in ksqlDB are temporal and the system does not work like an RDBMS. Check out https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/ to more details.

You say, “the result is incorrect”, but it’s unclear what you input data is, and also what your expected result should be? Can you clarify?

Hi @mjsax:
Thanks a lot for you help :slight_smile: . Just modify our experiment to make it work. The problem is that we use Ktable max(rowtime) to get the latest changes. So ever time a new record come, it should keep the latest record. But when we use the Kstream(which hold all the events ) to join the Ktable, we failed to get the latest events. That make us confused, hope it make sense for you. Thanks

I guess the miss-conception is how the query is executed.

If you have a table, it’s stored (or to be more precise buffered) inside the ksqlDB server using RockDB, but the actual storage layer is Kafka, ie, there is a table changleog stored in a Kafka topic.

When you execute the stream-table join query, the query won’t use the existing RocksDB, but it will read the full table changelog from Kafka. Again, checkout the talk I linked above.

This topic was automatically closed after 30 days. New replies are no longer allowed.