Troubleshooting StreamsJoin example in Kafka Streams 101 course

I’ve been learning the Streams API by working through the Kafka Streams 101 course. I’m having trouble with the StreamsJoin example featured on the “Hands On: Joins” section. The current source for that example is here. I’ve modified the example to add some logging to troubleshoot the issue I’m having.

The example joins two KStreams (Appliance, Electronic), then left-joins the resulting stream against a KTable of Users using an “enrichmentJoiner” to add user names when the user isn’t null:

        ValueJoiner<CombinedOrder, User, CombinedOrder> enrichmentJoiner = (combined, user) -> {
            if (user != null) {
                combined.setUserName(user.getName());
            } else {
                LOG.debug("user is null");
            }
            return combined;
        };

The issue is that when the left join against the userTable occurs, it appears to have no data, so the User passed to the enrichmentJoiner is always null.

By adding this line, I can see that there seems to be a 30 second lag for the KTable to actually contain anything:

        userTable.toStream().peek((key, value) -> LOG.debug("User record - key " + key +" value " + value));

By the time data actually shows up in the usersTable in a usable way, it’s too late for the leftJoin that needed that data.

Here’s the output. Note the 30 second gap before the last two lines.

10:39:28.095 DEBUG TopicLoader - Record produced - offset - 0 timestamp - 1636565967917
10:39:28.095 DEBUG TopicLoader - Record produced - offset - 1 timestamp - 1636565968074
10:39:28.238 DEBUG TopicLoader - Record produced - offset - 0 timestamp - 1636565968214
10:39:28.259 DEBUG TopicLoader - Record produced - offset - 1 timestamp - 1636565968230
10:39:28.394 DEBUG TopicLoader - Record produced - offset - 0 timestamp - 1636565968365
10:39:28.395 DEBUG TopicLoader - Record produced - offset - 1 timestamp - 1636565968385
10:39:29.291 DEBUG StreamsJoin - Appliance stream incoming record key 10261998 value {"order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_id": "10261998", "time": 1636565967598}
10:39:29.343 DEBUG StreamsJoin - Appliance stream incoming record key 10261999 value {"order_id": "remodel-2", "appliance_id": "stove-2333", "user_id": "10261999", "time": 1636565967599}
10:39:29.344 DEBUG StreamsJoin - Electronic stream incoming record 10261998 value {"order_id": "remodel-1", "electronic_id": "television-2333", "user_id": "10261998", "price": 0.0, "time": 1636565967601}
10:39:29.349 DEBUG StreamsJoin - Stream-Stream Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "", "time": 1636565969348}
10:39:29.350 DEBUG StreamsJoin - user is null
10:39:29.350 DEBUG StreamsJoin - Stream-Table Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "", "time": 1636565969348}
10:39:29.663 DEBUG StreamsJoin - Electronic stream incoming record 10261999 value {"order_id": "remodel-2", "electronic_id": "laptop-5333", "user_id": "10261999", "price": 0.0, "time": 1636565967612}
10:39:29.663 DEBUG StreamsJoin - Stream-Stream Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1636565969663}
10:39:29.663 DEBUG StreamsJoin - user is null
10:39:29.663 DEBUG StreamsJoin - Stream-Table Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1636565969663}
10:39:59.262 DEBUG StreamsJoin - User record - key 10261998 value {"name": "Elizabeth Jones", "address": "5405 6th Avenue", "user_id": "10261998"}
10:39:59.262 DEBUG StreamsJoin - User record - key 10261999 value {"name": "Art Vandelay", "address": "407 64th Street", "user_id": "10261999"}

How can I adjust the way this example initializes everything so that the users KTable is “ready” in time for the join that uses it? Sorry, I don’t yet understand enough about KTables to ask that question in a better way.

[Why did someone flag this post as spam? It clearly isn’t.]

try setting small commit interval in the kafka streams application.

            Map.entry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L)

for example you can see how I apply it here: kafka-streams-dashboards/Streams.java at 7319be8f7845dbe8aafd384a67cda6643d399c95 · nbuesing/kafka-streams-dashboards · GitHub

the default commit interval is 30s when at least one semantics is used (default). For typical processing in production, this can work out quite well. For demonstrations and seeing how applications work, I find reducing this gives a better indication of what is going on.

I haven’t looked at this code receently, but when someone says there is a delay of 30s, this is usually the area to look at/into.

Thanks for the quick reply. That’s helpful. However, changing it just reduced the gap from 30s to 100ms. The data still isn’t available to the join when it occurs. Is there some way to ensure that the table is fully materialized from the underlying topic before the join uses it? I wouldn’t even know where to try a sleep hack in this scenario since the whole topology is being orchestrated by KafkaStreams.

I should add that I’m not using Confluent Cloud to run the example. I’m using a local single broker Docker-based Kafka instance created with the cp-all-in-one-community Dockerfile.

I figured out the issue. The example code’s TopicLoader populates the topic backing the KTable (users) after it populates the topics backing the KStreams (appliances, electronics). So the KTable records have timestamps which are later than the timestamps of the KStream records. Apparently, a stream-table join will not join a stream record with a table record which has a later timestamp than the stream record’s timestamp. This isn’t clear from the documentation. All I had to do to fix this was modify the TopicLoader to populate the users topic first.

1 Like

Yeah sorry about that, the forum software can be a bit zealous at times. It got triggered on This new user tried to create multiple posts with links to the same domain.. Should all be fixed now :slight_smile:

1 Like

Thanks for the response!

Check out the Kafka Summit talk “Temporal Joins in Kafka Streams and ksqlDB” for more details.

This talk is very helpful! Thank you, @mjsax.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.