Kstream-Kstream join based on common field not on the key

Hi Everyone,

We want to do Kstream-Kstream join based on the common Field(primary key). Currently with the below code we are getting result as just merging 2 Streams without any primary key constraint.

val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)


userRegions.join(regionMetrics)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

Could you please suggest how to join 2 Streams based on common field/Column.

A KStream does not have a primary-key, but only a key that is used to partition the data. A stream-stream join is always done on the key. Not sure what you mean by “just merging 2 Streams” – if a record from left or right does not find a matching record with the same key in the other stream, it should be dropped.

If you want to join on some attribute that is contained in the value you can do a selectKey() to set a new key:

userRegion.selectKey(...).join(regionMatrics.selectKey(...))(...);

If you really want to have primary-key semantics, you would need to read the data as a KTable instead of a KStream.

1 Like

Hi @mjsax ,

topic1-> id,name,address
topic2->id,contact

we want to inner join based on id’

val topic1: KStream[String, String] = builder.stream(inputTopic1)
val topic2: KStream[String, String] = builder.stream(inputTopic2)
topic1.selectKey((k,v)=>v).join(topic2.selectKey((k,v)=>v))(
((topic1Value, topic2Value) => topic1Value + “/” + topic2Value),
JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

We are trying the above set of lines but we are not able to see any records in output topic.Not sure how to use this selectKey() correctly.

Could you please suggest on this?

topic1-> id,name,address
topic2->id,contact

Not sure that this exactly means. Kafka uses a key-value data model, so the question is, which attributes are store in the key, and which are stored in the value? Using selectKey((k,v) -> v) mean you copy everything from the value into the key. It seems you want to put only the id into the key though.

As you use String data type, can it be that your value contains a CVS string "id,name,address"? For this case, you could extract the ID using split(): selectKey((k,v) -> v.split(",")[1]). This would transform your key-value pairs

<null, "id,name,address">

(note that there is no key yet, so it’s just null) to

<"id", "id,name,address">
1 Like

Hi @mjsax

Thanks for your input. I tried the same thing (below is the code I am trying)

  def kStreamSelectKey(inputTopic1: String,outputTopicName: String): Topology = {
    val builder: StreamsBuilder = new StreamsBuilder
    val topic1: KStream[String, String] = builder.stream(inputTopic1)
    val newStream=topic1.selectKey((k,v) => v.split(",")(1))
    newStream.to(outputTopicName)
    builder.build()
  }

I used selectKey() and tried writing the new generated stream to a new output topic. But there I can see exact same stream of values coming as of topic1.

I am still not getting my first column of my stream values.

I am very new to this topic. I am just trying to use 2 streams and join both of them based on some common unique column that it might have.
But what I understand kstream basically joins based on keys that are generated.

Could you please help me with some code/references where they are using selectKey to join based on user’s choice of selecting the unique column?

I used selectKey() and tried writing the new generated stream to a new output topic. But there I can see exact same stream of values coming as of topic1.

selectKey() does not change the value, but only the key. So it’s expected behavior I guess? How did you inspect the result stream outputTopicName? Some command line tool require to set some flag to include the key of the message in their output, and only show the value by default.

But what I understand kstream basically joins based on keys that are generated.

Not sure what you mean by “generated”. The join is base on the message key. If your message does not have a key (ie, key is null), you need to set the key before the join.

You can find more examples on GitHub: kafka-streams-examples/src/main/java/io/confluent/examples/streams at 6.2.0-post · confluentinc/kafka-streams-examples · GitHub – I did not double check but assume that some contain a stream-stream join.

@mjsax

We are using data generated from datagen tool, As per our understanding , Keys are like timestamp,offset,partition etc . We are trying to join based on Value cloumns like orderid,itemid etc.

Timestamp, offset, partitions are independent… Offset and partitions is just metadata where the message is exactly stored within a topic. Both together uniquely identify a message.

Also, each message has a timestamp that is (by default) set by the producer on-write.

But the actual “payload” of a message is a key and a value. It could be, that the datagen tool does not use the key and set it to null though.

(Btw: each message could also have headers… – just mentioning it for completeness.)

If you use kafka-console-consumer to inspect a topic, you need to add command line argument --property print.key=true to see the key. Otherwise, the tool only prints the value.

@mjsax ,

Thank you, selectKey working fine to get the specific key to join 2 streams.

1 Like