How preserve order of message after join

example code

CREATE STREAM USERS_DATA_PRITE010 (KEY_STRUCT STRUCT KEY, USERNAME VARCHAR, AGES INTEGER ,EMAIL VARCHAR)
WITH (kafka_topic=‘USERS_DATA_PRITE010’,value_format=‘AVRO’, key_format=‘AVRO’,partitions=3);

INSERT INTO USERS_DATA_PRITE010 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A2’), ‘USER-A-1’, 1, ‘userA1@abc.com’);
INSERT INTO USERS_DATA_PRITE010 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A2’), ‘USER-A-2’, 2, ‘userA2@abc.com’);
INSERT INTO USERS_DATA_PRITE010 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A2’), ‘USER-A-3’, 3, ‘userA3@abc.com’);
INSERT INTO USERS_DATA_PRITE010 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A2’), ‘USER-A-4’, 4, ‘userA4@abc.com’);
INSERT INTO USERS_DATA_PRITE010 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A2’), ‘USER-A-5’, 5, ‘userA5@abc.com’);

CREATE TABLE USERS_DATA_PRITE011 (KEY_STRUCT STRUCT, USERNAME VARCHAR PRIMARY KEY, AGES INTEGER ,EMAIL VARCHAR)
WITH (kafka_topic=‘USERS_DATA_PRITE011’,value_format=‘AVRO’, key_format=‘AVRO’,partitions=3);

INSERT INTO USERS_DATA_PRITE011 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A2’), ‘USER-A-1’, 30, ‘userA1@abc.com’);
INSERT INTO USERS_DATA_PRITE011 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A2’), ‘USER-A-2’, 30, ‘userA2@abc.com’);
INSERT INTO USERS_DATA_PRITE011 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A3’), ‘USER-A-3’, 23, ‘userA3@abc.com’);
INSERT INTO USERS_DATA_PRITE011 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A4’), ‘USER-A-4’, 23, ‘userA4@abc.com’);
INSERT INTO USERS_DATA_PRITE011 (KEY_STRUCT, USERNAME, AGES, EMAIL) VALUES (STRUCT(USERID := ‘A5’), ‘USER-A-5’, 23, ‘userA5@abc.com’);

CREATE STREAM USERS_DATA_PRITE_JOIN_333 AS SELECT
USERS_DATA_PRITE010.KEY_STRUCT as KEYSHOW,
USERS_DATA_PRITE010.AGES,
USERS_DATA_PRITE010.EMAIL
FROM USERS_DATA_PRITE010
LEFT JOIN USERS_DATA_PRITE011
ON USERS_DATA_PRITE010.USERNAME = USERS_DATA_PRITE011.USERNAME
PARTITION BY USERS_DATA_PRITE010.KEY_STRUCT;

assume AGES column is order of message
Key of stream USERS_DATA_PRITE010 is KEY_STRUCT
Key of table USERS_DATA_PRITE011 is USERNAME

in USERS_DATA_PRITE010 it have order of message

but output after join of USERS_DATA_PRITE010 and USERS_DATA_PRITE011 is USERS_DATA_PRITE_JOIN_333

we need order of message of USERS_DATA_PRITE_JOIN_333 like a USERS_DATA_PRITE010

how to order message after join

That’s not supported with ksqlDB. You could add a post-processing step which reads the output topic and does some re-ordering, with some other tool like Kafka Streams (or Flink). Kafka Streams has not built-in operator for it either, but you could implement a custom Processor.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.