KStream - KTable join: to enriched a stream with Ktable data

Hi everyone!

I’m implementing an application using Kafka Streams and I have the following scenario:

Topic orders

{"id":100}:{"id":100, "clientId":1, "statusId"=1}
{"id":101}:{"id":101, "clientId":2, "statusId"=1}
{"id":102}:{"id":102, "clientId":3, "statusId"=1}

Create a KStream from topic

final KStream<Integer, OrderSource> orderStream = builder
        .stream("orders", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.orderSerde()))
        .map((key, value) -> new KeyValue<>(value.getId(), value));

KStream orders result

[KStream]: 100, OrderSource(id=100, statusId=1, clientId=1)
[KStream]: 101, OrderSource(id=101, statusId=1, clientId=2)
[KStream]: 102, OrderSource(id=102, statusId=1, clientId=3)

I would like to enrich this stream with data stored in a KTable.

This KTable is generated from other topic:

Topic clients

{"id":1}:{"id":1, "name":"name1", "email"="name1@test.com"}
{"id":2}:{"id":2, "name":"name2", "email"="name2@test.com"}
{"id":3}:{"id":3, "name":"name3", "email"="name3@test.com"}

Create a KTable from topic

final KTable<Integer, ClientSource> clientTable = builder
        .stream("clients", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.clientSerde()))
        .map((key, value) -> new KeyValue<>(value.getId(), value))
        .toTable(Materialized.<Integer, ClientSource, KeyValueStore<Bytes, byte[]>>as("client-table")
          .withKeySerde(Serdes.Integer())
          .withValueSerde(CustomSerdesFactory.clientSerde())
        );

KTable client result

[KTable]: 1, ClientSource(id=1, name=name1, email=name1@test.com)
[KTable]: 2, ClientSource(id=2, name=name2, email=name2@test.com)
[KTable]: 3, ClientSource(id=3, name=name3, email=name3@test.com)

The expected output would be a KStream like this:

[KStream]: 100, OrderEnrichedSource(id=100, statusId=1, name=name1, email=name1@test.com)
[KStream]: 101, OrderEnrichedSource(id=101, statusId=1, name=name2, email=name2@test.com)
[KStream]: 102, OrderEnrichedSource(id=102, statusId=1, name=name3, email=name3@test.com)

First of all, is it possible to make a join (or leftjoin) that allows this? I have read the documentation and I don’t see a similar example, although I may not be understanding the concepts well.

If possible, how would this join be done in Java code?

Regards friends =)

Joins happen on the key, so if you want to enrich you stream record by doing table lookups, you will need to set the client-id as key in the stream to be able to join against the table key.

final KStream<Integer, OrderSource> orderStream = builder
        .stream("orders", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.orderSerde()))
        .selectKey((key, value) -> value.getClientId()); // easier to just use selectKey() as you don't want to modify the value anyway

final KStream<Integer, OrderEnrichedSource > enrichedOrderStream =
    orderedStream.join(
        clientTable,
        (order, client) -> new OrderEnrichedSource(
            order.getId(),
            order.getStatus(),
            client.getName(),
            client.getEmail()
        )
    );

final KStream<Integer, OrderEnrichedSource > result =
    enrichedOrderStream.selectKey((k, v) -> v.getId());

After the join, you can apply an additional selectKey() to put the order-id back into key.