Kafka Stream Outer Join - old values are being aggregated

I have the following simple Kafka Stream application:

public class SimpleStream {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int partitionSize = 1;
        // Define the Kafka Streams configuration properties
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, partitionSize);
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

        // Create the topics if they do not already exist
        AdminClient admin = AdminClient.create(props);
        NewTopic inputTopic1 = new NewTopic("input-topic-1", partitionSize, (short) 1);
        NewTopic inputTopic2 = new NewTopic("input-topic-2", partitionSize, (short) 1);
        NewTopic outputTopic = new NewTopic("output-topic", partitionSize, (short) 1);
        admin.createTopics(Arrays.asList(inputTopic1, inputTopic2, outputTopic)).all().get();

        // Create a Kafka Streams builder object
        StreamsBuilder builder = new StreamsBuilder();

        // Define the input topics and create corresponding KStream objects
        KStream<String, String> stream1 = builder.stream("input-topic-1");
        KStream<String, String> stream2 = builder.stream("input-topic-2");

        // Join the two input streams using the outerJoin() method
        KStream<String, String> joined = stream1.outerJoin(
                        stream2,
                        (value1, value2) -> {
                            if (value1 == null && value2 != null) {
                                return value2;
                            } else if (value2 == null && value1 != null) {
                                return value1;
                            } else if (value1 != null) {
                                return value1 + "-" + value2;
                            } else {
                                return "";
                            }
                        },
                        JoinWindows.of(Duration.ofSeconds(15))
                ).groupByKey()
                .reduce((value1, value2) -> value1 + " + " + value2).toStream();

        // Write the concatenated records to the output topic using the to() method
        joined.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

        // Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();```

When inserting the following messages during the first window, I get these expected results:
1. Messages from left topic: Key: 0, Value: 1
2. Messages from right topic: Key: 0, Value: 2
3. Messages from left topic: Key: 0, Value: 3
4. Messages from right topic: Key: 0, Value: 4
Result: 1 + 2-1 + 2-3 + 4-1 + 4-3

Now, when the window end and I insert new messages, the old result ("1 + 2-1 + 2-3 + 4-1 + 4-3") is always being aggregated as well.

Let's say I insert the following message to left topic: Key: 0 Value: 5, I'll get this result:
1 + 2-1 + 2-3 + 4-1 + 4-3 + 5, and I would expect to get only "5", since it was send more then a minute after the previous result.

What am I missing here?

Hi,

What’s happening here is you have a reduce after the join, which is stateful. The join emits the correct result as the 5 is by itself. When the new key value of 0:5 goes through the reduce, it finds the previous result for the key 0 and appends the 5. So what you’re seeing is the previous state for that given key retrieved from a statestore which is what’s to be expected with stateful operations in Kafka Streams.

If you want to limit the results by time you could amend your application to this:

stream.outerJoin(...) 
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(15)))
.reduce(...)

HTH
Bill

Hello Bill,

Thank you very much for your response! With adding a second window like you suggested (with the same size as the join window) it definitely looks better now, since I do not see messages from older windows, except for the previous one. Meaning, If I push 3 messages to each topic with 15 seconds delay between them (which is the window’s size):

  1. key: 0, value: 0-left, key:0, value: 0-right
  2. key: 0, value: 1-left, key:0, value: 1-right
  3. key: 0, value: 1-left, key:0, value: 1-right

I end up with the following results:

    • 0-left + 0-left-0-right
    • 1-left-0-right + 0-left-1-right + 1-left-1-right
      3.+ 2-left + 2-left-2-right

Is there any way to ensure I will process messages from the each window exactly once?