I have two topics. Topic car
is compacted, key is VIN and value fuel, for example:
- ab: gasoline
- cd: gasoline
- ef: diesel
- gh: electricity
Topic fuel
is stream of price changes, key is fuel, value is price, for example
- gasoline: 10
- diesel: 12
- gasoline: 11
In Kafka stream terms car
is KTable and fuel
is KStream.
Is it possible to construct topology which for each fuel change prize produce events with affected cars? In given case
Input
gasoline: 10
Output:
ab-gasoline-10
cd-gasoline-10
Input
diesel: 12
Output:
ef-diesel-12
Input
gasoline: 12
Output:
ab-gasoline-12
cd-gasoline-12
I read about foreign key join, like
fuelStream.join(
carTable,
Car::getFuel,
(fuelPrice, car) -> car.vin() + _ + car.fuel() + _ + fuelPrice.price())
But foreign key works only on KTable-KTable join, not KStream-KTable join.
Is is possible to construct such topology?
Hi @michaldo,
Off the top of my head, there are two options I can think of.
- On the
car
use the KTable.toStream(KeyValueMapper)
where the KevValueMapper
inverts the structure of the record i.e (key, value) -> KeyValue.pair(value, key)
then perform the join fuelStream.join(convertedTableStream, ....)
- Convert the
fuel
stream to a table with KStream.toTable()
then use the foreign key join since you have two KTable
instances
HTH,
Bill
Ad.1 fuelStream.join(convertedTableStream, ....)
is KStream-KStream join and need JoinWindows
. What should I use? I want emit events immediately after fuel price change came.
Ad.2 When I call fuelStream.toTable()
I may omit some price changes - only last event is important. What if I want process all events from fuelStream?
Based on what you’re saying here, I’d try option one first - convert the table to a stream. For the JoinWindows
, set it to a time that you think will be acceptable to make sure it will capture both sides.
Let’s say cars are changing rarely but fuel price often.
If I convert car
KTable into stream initially there will be 4 events:
- gasoline:ab,
- gasoline:cd
- diesel:ef
- electricity:gh
Initially, converted-car events and fuel event may match in JoinWindow. But later time pass, fuel events came and they will not find partners on converted-car
topic within the window.
Because converted-car
KStream is almost dead - new events came when new car is added or old car is deleted. Means rarely.
I get your point, but with the way the data is structured, you’re limited in your options.
If the fuel
stream had the foreign key you could convert the car
KTable
to a GlobalKTable
and then do a KStream-GlobalKTable join. The reason that would work is the KStream-GlobalKTable
join takes a KeyValueMapper
parameter used to extract foreign key from the stream-side of the join to the PK of the table.
Is it possible to update the fuel
stream topic?
For the JoinWindow
of the fuel-converted-car
join you could set the join windows to a significant value like 24 hours - would that work?
extract foreign key from the stream-side
That is syntax sugar for KStream selectKey
and does not solve my problem, because I want use foreign key from table-side.
Let say I transform car
KTable into converted-car
KStream and join with fuel KStream with 24 hours window.
Ley say new car is added at 07:25 and published as converted-car (gasonline:ij)
Then fuel event gasoline:13 comes at 7:55 and is matched with new car ij
What when repartition happens at 8:25? New node takes partition of gasoline of topic converted-car
. Will the new node still match (gasonline:ij) or offset of ij car is commited and new node will match only new cars?
If there is a reassignment of partitions, the new Kafka Streams client will restore the state for the joins statestore from the changelog, so the new node will still match gasonline:ij.
Does this help you?
I verified that you are right and window join “lasts” after reassignment of partition on restart.