I have the following simple Kafka Stream application:
public class SimpleStream {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int partitionSize = 1;
// Define the Kafka Streams configuration properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, partitionSize);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
// Create the topics if they do not already exist
AdminClient admin = AdminClient.create(props);
NewTopic inputTopic1 = new NewTopic("input-topic-1", partitionSize, (short) 1);
NewTopic inputTopic2 = new NewTopic("input-topic-2", partitionSize, (short) 1);
NewTopic outputTopic = new NewTopic("output-topic", partitionSize, (short) 1);
admin.createTopics(Arrays.asList(inputTopic1, inputTopic2, outputTopic)).all().get();
// Create a Kafka Streams builder object
StreamsBuilder builder = new StreamsBuilder();
// Define the input topics and create corresponding KStream objects
KStream<String, String> stream1 = builder.stream("input-topic-1");
KStream<String, String> stream2 = builder.stream("input-topic-2");
// Join the two input streams using the outerJoin() method
KStream<String, String> joined = stream1.outerJoin(
stream2,
(value1, value2) -> {
if (value1 == null && value2 != null) {
return value2;
} else if (value2 == null && value1 != null) {
return value1;
} else if (value1 != null) {
return value1 + "-" + value2;
} else {
return "";
}
},
JoinWindows.of(Duration.ofSeconds(15))
).groupByKey()
.reduce((value1, value2) -> value1 + " + " + value2).toStream();
// Write the concatenated records to the output topic using the to() method
joined.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
// Build and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();```
When inserting the following messages during the first window, I get these expected results:
1. Messages from left topic: Key: 0, Value: 1
2. Messages from right topic: Key: 0, Value: 2
3. Messages from left topic: Key: 0, Value: 3
4. Messages from right topic: Key: 0, Value: 4
Result: 1 + 2-1 + 2-3 + 4-1 + 4-3
Now, when the window end and I insert new messages, the old result ("1 + 2-1 + 2-3 + 4-1 + 4-3") is always being aggregated as well.
Let's say I insert the following message to left topic: Key: 0 Value: 5, I'll get this result:
1 + 2-1 + 2-3 + 4-1 + 4-3 + 5, and I would expect to get only "5", since it was send more then a minute after the previous result.
What am I missing here?