I’ve been learning the Streams API by working through the Kafka Streams 101 course. I’m having trouble with the StreamsJoin example featured on the “Hands On: Joins” section. The current source for that example is here. I’ve modified the example to add some logging to troubleshoot the issue I’m having.
The example joins two KStreams (Appliance, Electronic), then left-joins the resulting stream against a KTable of Users using an “enrichmentJoiner” to add user names when the user isn’t null:
ValueJoiner<CombinedOrder, User, CombinedOrder> enrichmentJoiner = (combined, user) -> {
if (user != null) {
combined.setUserName(user.getName());
} else {
LOG.debug("user is null");
}
return combined;
};
The issue is that when the left join against the userTable occurs, it appears to have no data, so the User passed to the enrichmentJoiner is always null.
By adding this line, I can see that there seems to be a 30 second lag for the KTable to actually contain anything:
userTable.toStream().peek((key, value) -> LOG.debug("User record - key " + key +" value " + value));
By the time data actually shows up in the usersTable in a usable way, it’s too late for the leftJoin that needed that data.
Here’s the output. Note the 30 second gap before the last two lines.
10:39:28.095 DEBUG TopicLoader - Record produced - offset - 0 timestamp - 1636565967917
10:39:28.095 DEBUG TopicLoader - Record produced - offset - 1 timestamp - 1636565968074
10:39:28.238 DEBUG TopicLoader - Record produced - offset - 0 timestamp - 1636565968214
10:39:28.259 DEBUG TopicLoader - Record produced - offset - 1 timestamp - 1636565968230
10:39:28.394 DEBUG TopicLoader - Record produced - offset - 0 timestamp - 1636565968365
10:39:28.395 DEBUG TopicLoader - Record produced - offset - 1 timestamp - 1636565968385
10:39:29.291 DEBUG StreamsJoin - Appliance stream incoming record key 10261998 value {"order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_id": "10261998", "time": 1636565967598}
10:39:29.343 DEBUG StreamsJoin - Appliance stream incoming record key 10261999 value {"order_id": "remodel-2", "appliance_id": "stove-2333", "user_id": "10261999", "time": 1636565967599}
10:39:29.344 DEBUG StreamsJoin - Electronic stream incoming record 10261998 value {"order_id": "remodel-1", "electronic_id": "television-2333", "user_id": "10261998", "price": 0.0, "time": 1636565967601}
10:39:29.349 DEBUG StreamsJoin - Stream-Stream Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "", "time": 1636565969348}
10:39:29.350 DEBUG StreamsJoin - user is null
10:39:29.350 DEBUG StreamsJoin - Stream-Table Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "", "time": 1636565969348}
10:39:29.663 DEBUG StreamsJoin - Electronic stream incoming record 10261999 value {"order_id": "remodel-2", "electronic_id": "laptop-5333", "user_id": "10261999", "price": 0.0, "time": 1636565967612}
10:39:29.663 DEBUG StreamsJoin - Stream-Stream Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1636565969663}
10:39:29.663 DEBUG StreamsJoin - user is null
10:39:29.663 DEBUG StreamsJoin - Stream-Table Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1636565969663}
10:39:59.262 DEBUG StreamsJoin - User record - key 10261998 value {"name": "Elizabeth Jones", "address": "5405 6th Avenue", "user_id": "10261998"}
10:39:59.262 DEBUG StreamsJoin - User record - key 10261999 value {"name": "Art Vandelay", "address": "407 64th Street", "user_id": "10261999"}
How can I adjust the way this example initializes everything so that the users KTable is “ready” in time for the join that uses it? Sorry, I don’t yet understand enough about KTables to ask that question in a better way.
[Why did someone flag this post as spam? It clearly isn’t.]