I have two topics. Topic
car is compacted, key is VIN and value fuel, for example:
- ab: gasoline
- cd: gasoline
- ef: diesel
- gh: electricity
fuel is stream of price changes, key is fuel, value is price, for example
- gasoline: 10
- diesel: 12
- gasoline: 11
In Kafka stream terms
car is KTable and
fuel is KStream.
Is it possible to construct topology which for each fuel change prize produce events with affected cars? In given case
I read about foreign key join, like
(fuelPrice, car) -> car.vin() + _ + car.fuel() + _ + fuelPrice.price())
But foreign key works only on KTable-KTable join, not KStream-KTable join.
Is is possible to construct such topology?
Off the top of my head, there are two options I can think of.
- On the
car use the
KTable.toStream(KeyValueMapper) where the
KevValueMapper inverts the structure of the record i.e
(key, value) -> KeyValue.pair(value, key) then perform the join
- Convert the
fuel stream to a table with
KStream.toTable() then use the foreign key join since you have two
fuelStream.join(convertedTableStream, ....) is KStream-KStream join and need
JoinWindows. What should I use? I want emit events immediately after fuel price change came.
Ad.2 When I call
fuelStream.toTable() I may omit some price changes - only last event is important. What if I want process all events from fuelStream?
Based on what you’re saying here, I’d try option one first - convert the table to a stream. For the
JoinWindows, set it to a time that you think will be acceptable to make sure it will capture both sides.
Let’s say cars are changing rarely but fuel price often.
If I convert
car KTable into stream initially there will be 4 events:
Initially, converted-car events and fuel event may match in JoinWindow. But later time pass, fuel events came and they will not find partners on
converted-car topic within the window.
converted-car KStream is almost dead - new events came when new car is added or old car is deleted. Means rarely.
I get your point, but with the way the data is structured, you’re limited in your options.
fuel stream had the foreign key you could convert the
KTable to a
GlobalKTable and then do a KStream-GlobalKTable join. The reason that would work is the
KStream-GlobalKTable join takes a
KeyValueMapper parameter used to extract foreign key from the stream-side of the join to the PK of the table.
Is it possible to update the
fuel stream topic?
JoinWindow of the
fuel-converted-car join you could set the join windows to a significant value like 24 hours - would that work?
extract foreign key from the stream-side
That is syntax sugar for KStream
selectKey and does not solve my problem, because I want use foreign key from table-side.
Let say I transform
car KTable into
converted-car KStream and join with fuel KStream with 24 hours window.
Ley say new car is added at 07:25 and published as converted-car (gasonline:ij)
Then fuel event gasoline:13 comes at 7:55 and is matched with new car ij
What when repartition happens at 8:25? New node takes partition of gasoline of topic
converted-car. Will the new node still match (gasonline:ij) or offset of ij car is commited and new node will match only new cars?
If there is a reassignment of partitions, the new Kafka Streams client will restore the state for the joins statestore from the changelog, so the new node will still match gasonline:ij.
Does this help you?
I verified that you are right and window join “lasts” after reassignment of partition on restart.