Hi everyone!
I’m implementing an application using Kafka Streams and I have the following scenario:
Topic orders
{"id":100}:{"id":100, "clientId":1, "statusId"=1}
{"id":101}:{"id":101, "clientId":2, "statusId"=1}
{"id":102}:{"id":102, "clientId":3, "statusId"=1}
Create a KStream from topic
final KStream<Integer, OrderSource> orderStream = builder
.stream("orders", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.orderSerde()))
.map((key, value) -> new KeyValue<>(value.getId(), value));
KStream orders result
[KStream]: 100, OrderSource(id=100, statusId=1, clientId=1)
[KStream]: 101, OrderSource(id=101, statusId=1, clientId=2)
[KStream]: 102, OrderSource(id=102, statusId=1, clientId=3)
I would like to enrich this stream with data stored in a KTable.
This KTable is generated from other topic:
Topic clients
{"id":1}:{"id":1, "name":"name1", "email"="name1@test.com"}
{"id":2}:{"id":2, "name":"name2", "email"="name2@test.com"}
{"id":3}:{"id":3, "name":"name3", "email"="name3@test.com"}
Create a KTable from topic
final KTable<Integer, ClientSource> clientTable = builder
.stream("clients", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.clientSerde()))
.map((key, value) -> new KeyValue<>(value.getId(), value))
.toTable(Materialized.<Integer, ClientSource, KeyValueStore<Bytes, byte[]>>as("client-table")
.withKeySerde(Serdes.Integer())
.withValueSerde(CustomSerdesFactory.clientSerde())
);
KTable client result
[KTable]: 1, ClientSource(id=1, name=name1, email=name1@test.com)
[KTable]: 2, ClientSource(id=2, name=name2, email=name2@test.com)
[KTable]: 3, ClientSource(id=3, name=name3, email=name3@test.com)
The expected output would be a KStream like this:
[KStream]: 100, OrderEnrichedSource(id=100, statusId=1, name=name1, email=name1@test.com)
[KStream]: 101, OrderEnrichedSource(id=101, statusId=1, name=name2, email=name2@test.com)
[KStream]: 102, OrderEnrichedSource(id=102, statusId=1, name=name3, email=name3@test.com)
First of all, is it possible to make a join (or leftjoin) that allows this? I have read the documentation and I don’t see a similar example, although I may not be understanding the concepts well.
If possible, how would this join be done in Java code?
Regards friends =)