Join two streams without matching keys based on most recent timestamp

Hi,

I would like to join two streams that don’t have matching keys. This would be a “full outer join” based on the most recent timestamp.

For example:

Limit Order Stream

timestamp price
09:00 1
09:01 2
09:02 3
09:03 4
09:04 5
09:05 6
09:35 7
09:45 8

Indicator Stream

timestamp signal
08:55 1
09:00 -1
09:05 1
09:10 -1
09:15 1
09:20 -1
09:25 1
09:30 -1

Expected output stream once joined

order_timestamp price indicator_timestamp signal
09:00 1 08:55 1
09:01 2 09:00 -1
09:02 3 09:00 -1
09:03 4 09:00 -1
09:04 5 09:00 -1
09:05 6 09:00 -1
null null 09:05 1
null null 09:10 -1
null null 09:15 1
null null 09:20 -1
null null 09:25 1
09:35 7 09:30 -1
09:45 8 09:30 -1

Note:

  • The limit order is null for any indicator that is not used.
  • An indicator will be reused if there are more orders than indicators.

Is this something that can be handled with kafka streams, or the lower level apis?

Thank you for your time in advance.

Well, if you don’t have a key to join on, the first issue is that you won’t be able to run the program with more than one partition per input topic…

If one partition is good enough, using the Processor API you might be able to pull it off. In the end, it seems you only want to consider the latest message per input for the join. You can read both inputs and attach a state store for each processor. Each state store should contain at most one message. If you get an input record on the left, you can do a lookup into the right store, and the other way around. Using stateStore#all() you can just scan the full store as you don’t care about the keys.

Thank you for your reply.

Right now I would need to run the program in one partition as the entire dataset needs to be processed sequentially. I understand the implications of this, I wonder if this suggests Kafka may not be the right tool for this…

When processing I always need to be looking at the “next” message, to ensure the join for the “current” message is with the right timestamp. I will look more into the Processor API with State store and see if I can find a way, thanks for pointing me in the right direction.

1 Like