Historic data load when joining (or aggregating) 2 streams

Say you have 2 streams - chat(keyed by chatId on the topic but rekeyed to userId for this join) & reaction-info(keyed by userId).

Whenever a new chat event is created I want to join to the latest reaction-info sent before the chat event. Users can have multiple chats & multiple reaction-infos

Can either do a left join to a reaction-info table
Or reduce and compare timestamps of events to ensure we get a reaction info before the chat event.
Or probably lots of other things.

However, I also have a db with historic data for these events and short of passing them in order I don’t know how I can ensure that I get the most up to date reaction info event occuring before each chat event. Guidance on an approach to this would be huuuugely appreciated.

I’d prefer to reduce on two streams to allow for corrective updates but would appreciate suggestions on how to load the history correctly with the reaction-info being a stream or a table.

Is there some handy settings that make this waaay easier to do on a table than it is to do in a reduce?


In general, Kafka Streams processes records in timestamp order. Thus, if your input data is properly timestamped, it should work out-of-the-box. Increasing the config max.task.idle.ms should help to improve the synchronization.

Cf https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

If you need to handle out-of-order data it’s more complex and it might be best to use the Processor API… We are working on improved out-of-order record handling for stream-table joins already, but it will take some time until we can ship it…

Thanks for the reply mjsax!

Is this true even of globalktables? The table will be empty on startup so I’m presuming what you’ve said still applies. I’m in control of when the events get produced so I can certainly produce them in order with a larger max.task.idle.ms on the streams app.

Thanks again, temporal-joins sound fuuuuuuun!

No. GlobalKTables are not time-synchronized. Did not realize that you are using a GlobalKTable.

Okay cool,

I was just clarifying the exceptions to the rule in my own head.

Thanks again!

1 Like