How to perform KTable-KStream join

Hi Folks,

I am new to Kafka. I was learning about joins.

I have a consumer which consumes a topic, extracts a payload in JSON format.

The payload looks like -

{
"WorkerActivityName": "Available",
"EventType": "worker.activity.update",
"ResourceType": "worker",
"WorkerTimeInPreviousActivityMs": "237",
"Timestamp": "1626114642",
"WorkerActivitySid": "WAc9030ef021bc1786d3ae11544f4d9883",
"WorkerPreviousActivitySid": "WAf4feb231e97c1878fecc58b26fdb95f3",
"WorkerTimeInPreviousActivity": "0",
"AccountSid": "AC8c5cd8c9ba538090da104b26d68a12ec",
"WorkerName": "Dorothy.Finegan@Copart.Com",
"Sid": "EV284c8a8bc27480e40865263f0b42e5cf",
"TimestampMs": "1626114642204",
"WorkerSid": "WKe638256376188fab2a98cccb3c803d67",
"WorkspaceSid": "WS38b10d521442ecb74fcc263d5a4d726e",
"WorkspaceName": "Copart-MiPhone",
"WorkerPreviousActivityName": "Unavailable(RNA)",
"EventDescription": "Worker Dorothy.Finegan@Copart.Com updated to Available Activity",
"ResourceSid": "WKe638256376188fab2a98cccb3c803d67",
"WorkerAttributes": "{\"miphone_dept\":[\"USA_YRD_OPS\"],\"languages\":[\"en\"],\"home_region\":\"GL\",\"roles\":[\"supervisor\"],\"miphone_yards\":[\"81\"],\"miphone_enabled\":true,\"miphone_states\":[\"IL\"],\"home_state\":\"IL\",\"skills\":[\"YD_SELLER\",\"YD_TITLE\"],\"home_division\":\"Northern\",\"miphone_divisions\":[\"Northern\"],\"miphone_functions\":[\"outbound_only\"],\"full_name\":\"Dorothy Finegan\",\"miphone_regions\":[\"GL\"],\"home_country\":\"USA\",\"copart_user_id\":\"USA3204\",\"home_yard\":\"81\",\"home_dept\":\"USA_YRD_OPS\",\"email\":\"dorothy.finegan@copart.com\",\"home_dept_category\":\"OPS\",\"contact_uri\":\"client:Dorothy_2EFinegan_40Copart_2ECom\",\"queue_activity\":\"Available\",\"teams\":[],\"remote_employee\":false,\"miphone_call_center_units\":[\"USA_YRD_OPS|81\"],\"miphone_call_center_teams\":[]}"
}

I was interested in creating a K-Table with key- WorkspaceSid": “WS38b10d521442ecb74fcc263d5a4d726e” and value - any other.

So far, I am just able to create the table using this syntax -

KTable<String, String> kTable = builder.table("testqa2", Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryableKTable")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.String()));

Please can anyone help me out?

If you use builder.table(), the message key will be used a table-key, and the message value will be used a table-value. Recall, that Kafka uses a key-value data model.

For your case, it seems that you want to use a field from the message value as table-key. Thus, you cannot use table() method. Instead, you will need to first read the topic as a KStream via builder.stream(), and set the desired key, for example using KStream#selectKey(). After you set the key, you can convert the stream into a table via KStream#toTable().

Something like this:

KTable tabe = builder.stream(...).selectKey(...).toTable(...);

Hey,

While using this:

KTable tabe = builder.stream(...).selectKey(...).toTable(...);

I keep running into the issue on toTable not present in KStream

Cannot resolve method ‘toTable’ in ‘KStream’

My line of code is:

KTable<String, String> abc = orders.selectKey((key, value) -> Worker.getWorkerSid()).toTable("rekeyed-topic");

Any idea what the issue is?

Maybe you are using an older version of Kafka Streams? toTable() was added in version 2.5.0.

If you are indeed on an older version, you can create a new topic first, write the re-keyed stream into this topic and read it back as a table:

builder.stream(...).selectKey(...).to("newTopic", ...);
KTable table = builder.table("newTopic", ...);