Hi there!
I’m working on an aggregation using sliding windows, and I’d expect the aggregated value to return to zero after at least one window duration without event has elapsed. However, that’s not what I see in my tests using the TopologyTestDriver: the end result is the last computed value, not zero.
I’ve read that sliding windows only recompute a new aggregated value when there is a new event, but will the value also be recomputed when past events become older than one window duration?
Here is the code I’m using:
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class UnderstandSlidingWindowsSimpleCountTest {
private static final String INPUT_TOPIC_NAME = "input-topic";
private static final String OUTPUT_TOPIC_NAME = "output-topic";
private final Serde<String> stringSerde = Serdes.String();
private final Serde<Long> longSerde = Serdes.Long();
private final Duration windowSize = Duration.ofSeconds(10);
private final Instant startTime = Instant.now();
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
@BeforeEach
void buildTopology() {
final var builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC_NAME, Consumed.with(stringSerde, stringSerde))
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(windowSize))
.count(
Named.as("count_per_key"),
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("windowed-count-per-key")
.withKeySerde(stringSerde)
.withValueSerde(longSerde))
.toStream()
.peek(
(key, value) ->
System.out.printf(
"key=%s start=%s end=%s value=%s%n",
key.key(), key.window().startTime(), key.window().endTime(), value))
.map((windowedKey, value) -> KeyValue.pair(windowedKey.key(), value))
.to(OUTPUT_TOPIC_NAME, Produced.with(stringSerde, longSerde));
var topology = builder.build();
testDriver = new TopologyTestDriver(topology);
inputTopic =
testDriver.createInputTopic(
INPUT_TOPIC_NAME, stringSerde.serializer(), stringSerde.serializer());
outputTopic =
testDriver.createOutputTopic(
OUTPUT_TOPIC_NAME, stringSerde.deserializer(), longSerde.deserializer());
}
@AfterEach
void closeTopologyTestDriver() {
testDriver.close();
}
@Test
void countsToZeroAfterWindow() {
inputTopic.pipeInput("key1", "", startTime);
inputTopic.pipeInput("key1", "", startTime.plusSeconds(1));
//inputTopic.advanceTime(windowSize.multipliedBy(3));
//testDriver.advanceWallClockTime(windowSize);
inputTopic.pipeInput("key2", "", startTime.plus(windowSize.multipliedBy(3)));
assertThat(outputTopic.readKeyValuesToMap())
.containsExactlyInAnyOrderEntriesOf(Map.of("key2", 1L));
}
}
The test fails with
java.lang.AssertionError:
Expecting map:
{“key1”=2L, “key2”=1L}
to contain only:
[“key2”=1L]
but the following map entries were unexpected:
[“key1”=2L]
which indicates there is still a result for key1 around even though its messages are older than one window. Why is that happening?
I’ve tried advancing the stream time with inputTopic.advanceTime() and testDriver.advanceWallClockTime(), neither of these methods are design for this. Eventually, I’m sending another message with a different key to advance stream time and hopefully move the sliding window forward.