Hello,
I have 2 streams and i want to join them using a same key but something goes wrong and i need help to undestand what i am doing wrong.
I got a first stream with products information, i create a KTABLE using it and group by a field name KEY_DEPARTMENT_LEVEL
CREATE TABLE `table_agg`
AS
SELECT KEY_DEPARTMENT_LEVEL,
AS_VALUE(KEY_DEPARTMENT_LEVEL) as DEPARTMENT_LEVEL,
sum(IS_NEEDED_IN_FOLLOW) as COUNT_REFS_FOLLOW,
count(*) as COUNT_REFS_TOTAL
FROM `topic_product_counted`
GROUP BY KEY_DEPARTMENT_LEVEL EMIT CHANGES;
In order to make a stream / stream join, i used the topics produces by the aggregation to get all the changes.
CREATE STREAM `stream_for_agg`
WITH(KAFKA_TOPIC='table_agg', VALUE_FORMAT='AVRO');
I had a second stream wich is updated only once a day with some date and key is the same as the field previously use for group by “KEY_DEPARTMENT_LEVEL”
DESCRIBE stream2;
Field | Type
-------------------------------------------------
KEY | VARCHAR(STRING) (key)
REF_TO_FOLLOW_LEVEL | INTEGER
COUNT_REFS_TOTAL | INTEGER
COUNT_REFS_FOLLOW | INTEGER
If i join the 2 streams i got null on the right side.
select agg.* ,
picture.key ,
picture.REF_TOTAL_LEVEL,
picture.COUNT_REFS_FOLLOW,
picture.COUNT_REFS_TOTAL
from `stream_for_agg` agg
LEFT JOIN `stream2` picture
WITHIN 3 DAYS
ON agg.DEPARTMENT_LEVEL = picture.KEY
WHERE agg.DEPARTMENT_LEVEL like '001_003_%'
EMIT CHANGES;
|AGG_DEPARTMENT_LE|AGG_COUNT_REFS_FO|AGG_GAP_REFS_FOLL|AGG_GAP_VALUE_REF|AGG_COUNT_REFS_TO|AGG_GAP_REFS_TOTA|AGG_GAP_VALUE_REF|KEY |REF_TOTAL_LEVEL |PICTURE_COUNT_REF|PICTURE_COUNT_REF|
|VEL |LLOW |OW |S_FOLLOW |TAL |L |S_TOTAL | | |S_FOLLOW |S_TOTAL |
|001_003_11_DEP_11|2 |0.0 |0.0 |23 |29.0 |55.0856 |null |null |null |null |
|_1609286400000_16| | | | | | | | | | |
|16457600000 | | | | | | | | | | |
But if i change the setting to go back earliest
SET 'auto.offset.reset' = 'earliest';
The join works and all rows are matching.
select agg.* ,
picture.key ,
picture.REF_TOTAL_LEVEL,
picture.COUNT_REFS_FOLLOW,
picture.COUNT_REFS_TOTAL
from `stream_for_agg` agg
LEFT JOIN `stream2` picture
WITHIN 3 DAYS
ON agg.DEPARTMENT_LEVEL = picture.KEY
WHERE agg.DEPARTMENT_LEVEL like '001_003_%'
EMIT CHANGES;
|AGG_DEPARTMENT_LE|AGG_COUNT_REFS_FO|AGG_GAP_REFS_FOLL|AGG_GAP_VALUE_REF|AGG_COUNT_REFS_TO|AGG_GAP_REFS_TOTA|AGG_GAP_VALUE_REF|KEY |REF_TOTAL_LEVEL |PICTURE_COUNT_REF|PICTURE_COUNT_REF|
|VEL |LLOW |OW |S_FOLLOW |TAL |L |S_TOTAL | | |S_FOLLOW |S_TOTAL |
|001_003_04_DEP_04|0 |0.0 |0.0 |1 |-1.0 |-3.55 |001_003_04_DEP_04|18476 |143 |147 |
|_1609286400000_16| | | | | | |_1609286400000_16| | | |
|16457600000 | | | | | | |16457600000 | | | |
|001_003_04_DEP_04|0 |0.0 |0.0 |2 |-7.0 |-23.95 |001_003_04_DEP_04|18476 |143 |147 |
|_1609286400000_16| | | | | | |_1609286400000_16| | | |
|16457600000 | | | | | | |16457600000 | | | |
|001_003_04_DEP_04|0 |0.0 |0.0 |3 |-8.0 |-27.57 |001_003_04_DEP_04|18476 |143 |147 |
|_1609286400000_16| | | | | | |_1609286400000_16| | | |
|16457600000 | | | | | | |16457600000 | | | |
The stream2 has timestamp between now and 3 days so i d’ont understand.
What could be the reason ?
Thanks for your help