Enriching stream by aggrigation data

Hello.
Could you please assist with this task?
Have a stream
CREATE STREAM my_stream
( row_id STRING KEY
, session_id STRING
, date_created BIGINT
)
And I need to enrich it with some aggregation data (min, max…) like
CREATE STREAM enrich_stream
( row_id STRING KEY
, session_id STRING
, date_created BIGINT
, min_date_created_by_session_id BIGINT
, max_date_created_by_session_id BIGINT
)
The result stream should have the same amount of messages as the source stream.
Thanks in advance

Hi there!

So you pretty much have the right idea there except that you need a TABLE rather than a stream because you’re aggregating.

CREATE TABLE enrich_stream WITH (
    kafka_topic='enrich_stream',
) AS SELECT 
    session_id, 
    MIN( date_created ),
    MAX( date_created ) 
FROM my_stream 
GROUP BY session_id 
EMIT CHANGES;

Note: I would only group by session_id to make sure you get exactly the aggregation you want. If you need more data from the input stream, join it back onto the stream later on.

You will get an output message for each input (achieving what you wanted), but keep in mind that the MIN and MAX will change over time for a given session id because you’re evaluating on a per event basis… BUT the default functionality for tables in ksqlDB is to compact the events and retain only the latest value per key; so in order to keep all of those events over time, you should create the 'enrich_stream' ahead of time and not set it to be compact.

This feels a bit hacky to me, though. I’d like to understand your use case a bit better so that I might potentially offer a better solution.

Hi. Thanks for answer. It was my first attempt for resolve the task, but haven’t seccess. Let me explain my case more detail. I have a session message with a lot of params, and in result I need the current params and all params related to first messages in session.
So my source stream

CREATE STREAM my_stream
( row_id STRING KEY
, session_id   STRING
, date_created BIGINT
, param_1      STRING
);

Also I created the table

CREATE TABLE agg_table WITH (
    kafka_topic='agg_table',
) AS SELECT 
    session_id, 
    MIN( date_created )           as min_date_created_by_session_id,
    EARLIEST_BY_OFFSET( param_1 ) as min_param_1_by_session_id
FROM my_stream 
GROUP BY session_id 
EMIT CHANGES;

And after that result stream

CREATE STREAM enrich_stream WITH (
    kafka_topic='enrich_stream',
) AS SELECT 
    s.row_id,
    s.session_id, 
    date_created,
    param_1, 
    min_date_created_by_session_id,
    min_param_1_by_session_id
FROM my_stream s
     JOIN agg_table t ON s.session_id = t.session_id
EMIT CHANGES;

The main problem was when I joined stream and table sometimes I had data from table, sometimes not .
Due to documentation
Only events arriving on the stream side trigger downstream updates and produce join output. Updates on the table side don’t produce updated join output.
That mean when I join stream with table some data already got aggregated (already in table), some not.
If we had some windowing (as we have in stream-stream join) or waiting, it would be resolve my issue. But may be you know good solution for this?

This topic was automatically closed after 30 days. New replies are no longer allowed.