Stream 2 Stream Join with Partial Key

I have two streams that have a few common columns. None of these columns can uniquely identify a particular message. When I join the two streams using one of the columns, some messages get joined with the unintended counterparts.

I am wondering if there are any mechanisms to address partial key joins.

Thanks

Can you give a more concrete example, including what result you would like to get?

Thanks. Here is an example:

  1. Stream 1:

Key: Application

  • Application: App1, Transaction Number: 1, Message Number: 1, Message Type: REQUEST, Message Key: REQ1, Message Timestamp:

  • Application: App1, Transaction Number: 1, Message Number: 2, Message Type: REQUEST, Message Key: REQ1, Message Timestamp:

  1. Stream 2:

Key Application

Application: App1, Transaction Number: 1, Message Number: 1, Message Type: RESPONSE, Message Key: Data1, Message Timestamp:

Application: App1, Transaction Number: 1, Message Number: 1, Message Type: RESPONSE, Message Key: Data2, Message Timestamp:

  1. Result

When I left join the two streams using the Message Number column, I would like to get the following result:

Application: App1, Transaction Number: 1, Message Number: 1, Request Message Type: REQUEST, Response Message Type: RESPONSE, Request Message Key: REQ1, Response Message Key: Data1, Delta of Message Timestamp

Application: App1, Transaction Number: 1, Message Number: 1, Request Message Type: REQUEST, Response Message Type: RESPONSE, Request Message Key: REQ1, Response Message Key: Data2, Delta of Message Timestamp

Application: App1, Transaction Number: 1, Message Number: 2, Request Message Type: REQUEST, Response Message Type: null, Request Message Key: REQ2, Response Message Key: null, Delta of Message Timestamp: null

Thanks. Let me rephrase to make sure I understand. You have two streams, and you basically want to join both using a composite join attribute Transaction Number + Message Number? It can happen, that a record from the first stream (ie, a REQUEST) joins to multiple record from the second stream (ie, multiple RESPONSES)?

For records in the first stream, if there is no match in the second stream, you still want to get an output.

For you last result example:

Application: App1, Transaction Number: 1, Message Number: 2, Request Message Type: REQUEST, Response Message Type: null, Request Message Key: REQ2, Response Message Key: null, Delta of Message Timestamp: null

Should it be Request Message Key: REQ1 (instead of REQ2) because both inputs from the first stream have Message Key: REQ1 (or should the second record from the first input have Message Key: REQ2 and there is a typo in the input)?

Overall it seems you want to do a left-join (to ensure not REQUEST is dropped). The main question might be, if you want to do a stream-stream join or table-table joins (for a table-table join, it would be FK-join).

For the first cases, you would first use map() (on both inputs) to set the join columns for the message key (and move the existing key data into the value to preserve it). For a stream-stream join you would just do stream1.leftJoin(stream2,...) – you will need to figure out the join-window size, ie, the maximum time between request and response events you want to allow fro a join.

If you want to do a table-table join, you would only set a new key for the right input, and upsert both streams into tables via stream.toTable(). You can join both tables using a KeyExtractor for the left input table that returns the combined join attributes): table1.leftJoin(table2, leftTableKeyExtractor). – For this case, you need to be aware that tables might grow unbounded if you have an unbounded key space.

You might want to check out: https://www.confluent.io/kafka-summit-sf18/zen-and-the-art-of-streaming-joins/

1 Like

Thanks Matthias. I appreciate the detailed explanation. The sample data is correct. Basically , a transaction may have one or more request messages. A request message may have zero or more corresponding response records. A reliable identifier for a particular pair of request/response records would be:

Transaction Number + Message Number + Request Message Key + Response Message Key.

The identifier fields are split between the records in the two streams (Stream 1 & Stream 2).

The REQ1 Request Record has two corresponding Response Records (Data1, Data2).

Yes, we have a window size defined. I will explore the map() step.

Would it help if all the events are read from the source as a single stream and processed as one single stream instead of two separate streams? Basically, we won’t have to do any joins any this case? But, we will need to create a new stream/table using a query that filters and groups events? How to calculate the timestamp delta in this scenario?

Thanks a ton :slight_smile:

Intersting. It’s still unclear what the overall goal is? You want to stitch together all messages (requests and responses) for each transaction? For this case, you would need an aggregation step as some point for sure.

Or do you only want to stitch together a single request to all its responses?

I need to stitch together a certain request message with all the applicable response messages based on some field values in both the request and response messages. The aggregation I need to perform is not a standard aggregation function like count, etc. I need to calculate how much time it took to get back the response.

I see, so it actually seems that you could do a “manual” stream-table join, ie, you will need to use the Processor API to build it manually:

You can use two transform() with shared KeyValueStore. The first transform() will read the request topic, and buffers all requests in the state store. The second transform() processes the responses and does range lookups into the KeyValueStore() bases on Transaction Number, Message Number and produces the join results.

You will also need some “expiration” mechanism, that allows you to remove old requests from the state store. During this expiration, you might also need to emit “left join results” for requests with no responses – thus, you need to do some additional book keeping the the state store if a request was joined or not: when adding a new request to the store, this “joined flag” would be set to false, and if the a request is actually joined, you would need to update it to true.

In Apache Kafka 2.8.0, we added prefixScan() to KeyValuesStores that should be helpful.

1 Like

Thanks. I will give this a shot. I will share the final outcome.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.