Is it possible KStream - foreign key join to table?

I have two topics. Topic car is compacted, key is VIN and value fuel, for example:

  • ab: gasoline
  • cd: gasoline
  • ef: diesel
  • gh: electricity

Topic fuel is stream of price changes, key is fuel, value is price, for example

  • gasoline: 10
  • diesel: 12
  • gasoline: 11

In Kafka stream terms car is KTable and fuel is KStream.

Is it possible to construct topology which for each fuel change prize produce events with affected cars? In given case

Input
gasoline: 10
Output:
ab-gasoline-10
cd-gasoline-10

Input
diesel: 12
Output:
ef-diesel-12

Input
gasoline: 12
Output:
ab-gasoline-12
cd-gasoline-12

I read about foreign key join, like

fuelStream.join(
  carTable, 
  Car::getFuel, 
  (fuelPrice, car) -> car.vin() + _ + car.fuel() + _ + fuelPrice.price())

But foreign key works only on KTable-KTable join, not KStream-KTable join.

Is is possible to construct such topology?

Hi @michaldo,

Off the top of my head, there are two options I can think of.

  1. On the car use the KTable.toStream(KeyValueMapper) where the KevValueMapper inverts the structure of the record i.e (key, value) -> KeyValue.pair(value, key) then perform the join fuelStream.join(convertedTableStream, ....)
  2. Convert the fuel stream to a table with KStream.toTable() then use the foreign key join since you have two KTable instances

HTH,
Bill

Ad.1 fuelStream.join(convertedTableStream, ....) is KStream-KStream join and need JoinWindows. What should I use? I want emit events immediately after fuel price change came.
Ad.2 When I call fuelStream.toTable() I may omit some price changes - only last event is important. What if I want process all events from fuelStream?

Based on what you’re saying here, I’d try option one first - convert the table to a stream. For the JoinWindows, set it to a time that you think will be acceptable to make sure it will capture both sides.

Let’s say cars are changing rarely but fuel price often.
If I convert car KTable into stream initially there will be 4 events:

  • gasoline:ab,
  • gasoline:cd
  • diesel:ef
  • electricity:gh

Initially, converted-car events and fuel event may match in JoinWindow. But later time pass, fuel events came and they will not find partners on converted-car topic within the window.
Because converted-car KStream is almost dead - new events came when new car is added or old car is deleted. Means rarely.

I get your point, but with the way the data is structured, you’re limited in your options.
If the fuel stream had the foreign key you could convert the car KTable to a GlobalKTable and then do a KStream-GlobalKTable join. The reason that would work is the KStream-GlobalKTable join takes a KeyValueMapper parameter used to extract foreign key from the stream-side of the join to the PK of the table.

Is it possible to update the fuel stream topic?
For the JoinWindow of the fuel-converted-car join you could set the join windows to a significant value like 24 hours - would that work?

extract foreign key from the stream-side

That is syntax sugar for KStream selectKey and does not solve my problem, because I want use foreign key from table-side.

Let say I transform car KTable into converted-car KStream and join with fuel KStream with 24 hours window.
Ley say new car is added at 07:25 and published as converted-car (gasonline:ij)
Then fuel event gasoline:13 comes at 7:55 and is matched with new car ij
What when repartition happens at 8:25? New node takes partition of gasoline of topic converted-car. Will the new node still match (gasonline:ij) or offset of ij car is commited and new node will match only new cars?

If there is a reassignment of partitions, the new Kafka Streams client will restore the state for the joins statestore from the changelog, so the new node will still match gasonline:ij.

Does this help you?

I verified that you are right and window join “lasts” after reassignment of partition on restart.