Trouble with STREAM STREAM JOIN

Hello,
I have 2 streams and i want to join them using a same key but something goes wrong and i need help to undestand what i am doing wrong.

I got a first stream with products information, i create a KTABLE using it and group by a field name KEY_DEPARTMENT_LEVEL

CREATE TABLE `table_agg`
AS
SELECT  KEY_DEPARTMENT_LEVEL,
        AS_VALUE(KEY_DEPARTMENT_LEVEL) as DEPARTMENT_LEVEL,
        sum(IS_NEEDED_IN_FOLLOW) as COUNT_REFS_FOLLOW,
        count(*) as COUNT_REFS_TOTAL
FROM `topic_product_counted`
GROUP BY KEY_DEPARTMENT_LEVEL EMIT CHANGES;

In order to make a stream / stream join, i used the topics produces by the aggregation to get all the changes.

CREATE STREAM `stream_for_agg`
WITH(KAFKA_TOPIC='table_agg', VALUE_FORMAT='AVRO');

I had a second stream wich is updated only once a day with some date and key is the same as the field previously use for group by “KEY_DEPARTMENT_LEVEL”

DESCRIBE stream2;
Field                  | Type                   
-------------------------------------------------
 KEY                    | VARCHAR(STRING)  (key) 
 REF_TO_FOLLOW_LEVEL    | INTEGER                
 COUNT_REFS_TOTAL       | INTEGER                
 COUNT_REFS_FOLLOW      | INTEGER                     

If i join the 2 streams i got null on the right side.

select agg.* , 
       picture.key ,
       picture.REF_TOTAL_LEVEL,
       picture.COUNT_REFS_FOLLOW,
       picture.COUNT_REFS_TOTAL
from `stream_for_agg` agg
  LEFT JOIN `stream2` picture
  WITHIN 3 DAYS
  ON agg.DEPARTMENT_LEVEL = picture.KEY
WHERE agg.DEPARTMENT_LEVEL like '001_003_%'
EMIT CHANGES;
|AGG_DEPARTMENT_LE|AGG_COUNT_REFS_FO|AGG_GAP_REFS_FOLL|AGG_GAP_VALUE_REF|AGG_COUNT_REFS_TO|AGG_GAP_REFS_TOTA|AGG_GAP_VALUE_REF|KEY              |REF_TOTAL_LEVEL  |PICTURE_COUNT_REF|PICTURE_COUNT_REF|
|VEL              |LLOW             |OW               |S_FOLLOW         |TAL              |L                |S_TOTAL          |                 |                 |S_FOLLOW         |S_TOTAL          |
|001_003_11_DEP_11|2                |0.0              |0.0              |23               |29.0             |55.0856          |null             |null             |null             |null             |
|_1609286400000_16|                 |                 |                 |                 |                 |                 |                 |                 |                 |                 |
|16457600000      |                 |                 |                 |                 |                 |                 |                 |                 |                 |                 |

But if i change the setting to go back earliest

SET 'auto.offset.reset' = 'earliest';

The join works and all rows are matching.

select agg.* , 
       picture.key ,
       picture.REF_TOTAL_LEVEL,
       picture.COUNT_REFS_FOLLOW,
       picture.COUNT_REFS_TOTAL
from `stream_for_agg` agg
  LEFT JOIN `stream2` picture
  WITHIN 3 DAYS
  ON agg.DEPARTMENT_LEVEL = picture.KEY
WHERE agg.DEPARTMENT_LEVEL like '001_003_%'
EMIT CHANGES;
|AGG_DEPARTMENT_LE|AGG_COUNT_REFS_FO|AGG_GAP_REFS_FOLL|AGG_GAP_VALUE_REF|AGG_COUNT_REFS_TO|AGG_GAP_REFS_TOTA|AGG_GAP_VALUE_REF|KEY              |REF_TOTAL_LEVEL  |PICTURE_COUNT_REF|PICTURE_COUNT_REF|
|VEL              |LLOW             |OW               |S_FOLLOW         |TAL              |L                |S_TOTAL          |                 |                 |S_FOLLOW         |S_TOTAL          |
|001_003_04_DEP_04|0                |0.0              |0.0              |1                |-1.0             |-3.55            |001_003_04_DEP_04|18476            |143              |147              |
|_1609286400000_16|                 |                 |                 |                 |                 |                 |_1609286400000_16|                 |                 |                 |
|16457600000      |                 |                 |                 |                 |                 |                 |16457600000      |                 |                 |                 |
|001_003_04_DEP_04|0                |0.0              |0.0              |2                |-7.0             |-23.95           |001_003_04_DEP_04|18476            |143              |147              |
|_1609286400000_16|                 |                 |                 |                 |                 |                 |_1609286400000_16|                 |                 |                 |
|16457600000      |                 |                 |                 |                 |                 |                 |16457600000      |                 |                 |                 |
|001_003_04_DEP_04|0                |0.0              |0.0              |3                |-8.0             |-27.57           |001_003_04_DEP_04|18476            |143              |147              |
|_1609286400000_16|                 |                 |                 |                 |                 |                 |_1609286400000_16|                 |                 |                 |
|16457600000      |                 |                 |                 |                 |                 |                 |16457600000      |                 |                 |                 |

The stream2 has timestamp between now and 3 days so i d’ont understand.

What could be the reason ?
Thanks for your help

Not sure what you exactly try to accomplish; not all details are clear: For example, why do you want/need a stream-stream join instead of a stream-table join?

Also note that all joins are temporal joins. Maybe this blog post helps: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

1 Like

Thanks for your answer.

I don’t want to make stream / table join because the stream2 contains only new data with new key. It’s a picture of a pre-calculate aggregation.
There is only one state for each key. If i m not wrong if i use KTABLE that will create a local db instance and that will grow every day without TTL deletion.
On Kstream i will keep only few days or hours without manage a TTL on the datas.

To give you more context , below a quick summary of my process.

The data in stream2 (precalculate with an external database aggregaton ) is first integrate for exemple at 01:00 AM. Stream2 will not emit any event for the rest of the day.

Then during the day after 05:00 AM, we will provide some change on products, all the events will be aggregate using a KEY. (same STRING as pre-caculate data)
We want to put together pre-calculate data (stream2) with all the emit change of the aggregation so we will try to join the data using the same STRING.

I don’t want to make stream / table join because the stream2 contains only new data with new key.

Why does this prevent you doing a stream-table join? You read stream2 as STREAM anyway? I was wondering why you convert the table_agg into stream_for_agg?

There is only one state for each key. If i m not wrong if i use KTABLE that will create a local db instance and that will grow every day without TTL deletion.

That’s correct. But it seems not to apply?

The data in stream2 (precalculate with an external database aggregaton ) is first integrate for exemple at 01:00 AM. Stream2 will not emit any event for the rest of the day.

Then during the day after 05:00 AM, we will provide some change on products, all the events will be aggregate using a KEY. (same STRING as pre-caculate data)
We want to put together pre-calculate data (stream2) with all the emit change of the aggregation so we will try to join the data using the same STRING.

This seems to be crux why a stream-table join won’t work: You try to join “past” stream data with “future” table data…

Not sure if the blog post helped you to understand what’s going on? Note, that data is processed in timestamp order and because you do a LEFT join, if a left input record is processed and there is no match in the right input, you will get a left-join result. If later, a right input arrives, you might still get the inner join result.

Currently, stream-stream left-joins behave a little weird in ksqlDB (they may emit a spurious left-join result, even if there will be an inner join result later…). We are currently working on a fix this this. (Cf https://issues.apache.org/jira/browse/KAFKA-10847)