Sliding window aggregation not counting back down to zero

Hi there!

I’m working on an aggregation using sliding windows, and I’d expect the aggregated value to return to zero after at least one window duration without event has elapsed. However, that’s not what I see in my tests using the TopologyTestDriver: the end result is the last computed value, not zero.

I’ve read that sliding windows only recompute a new aggregated value when there is a new event, but will the value also be recomputed when past events become older than one window duration?

Here is the code I’m using:

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class UnderstandSlidingWindowsSimpleCountTest {
  private static final String INPUT_TOPIC_NAME = "input-topic";
  private static final String OUTPUT_TOPIC_NAME = "output-topic";
  private final Serde<String> stringSerde = Serdes.String();
  private final Serde<Long> longSerde = Serdes.Long();
  private final Duration windowSize = Duration.ofSeconds(10);
  private final Instant startTime = Instant.now();
  private TopologyTestDriver testDriver;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, Long> outputTopic;

  @BeforeEach
  void buildTopology() {
    final var builder = new StreamsBuilder();
    builder.stream(INPUT_TOPIC_NAME, Consumed.with(stringSerde, stringSerde))
        .groupByKey()
        .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(windowSize))
        .count(
            Named.as("count_per_key"),
            Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("windowed-count-per-key")
                .withKeySerde(stringSerde)
                .withValueSerde(longSerde))
        .toStream()
        .peek(
            (key, value) ->
                System.out.printf(
                    "key=%s start=%s end=%s value=%s%n",
                    key.key(), key.window().startTime(), key.window().endTime(), value))
        .map((windowedKey, value) -> KeyValue.pair(windowedKey.key(), value))
        .to(OUTPUT_TOPIC_NAME, Produced.with(stringSerde, longSerde));
    var topology = builder.build();
    testDriver = new TopologyTestDriver(topology);
    inputTopic =
        testDriver.createInputTopic(
            INPUT_TOPIC_NAME, stringSerde.serializer(), stringSerde.serializer());
    outputTopic =
        testDriver.createOutputTopic(
            OUTPUT_TOPIC_NAME, stringSerde.deserializer(), longSerde.deserializer());
  }

  @AfterEach
  void closeTopologyTestDriver() {
    testDriver.close();
  }

  @Test
  void countsToZeroAfterWindow() {
    inputTopic.pipeInput("key1", "", startTime);
    inputTopic.pipeInput("key1", "", startTime.plusSeconds(1));
    //inputTopic.advanceTime(windowSize.multipliedBy(3));
    //testDriver.advanceWallClockTime(windowSize);
    inputTopic.pipeInput("key2", "", startTime.plus(windowSize.multipliedBy(3)));
    assertThat(outputTopic.readKeyValuesToMap())
        .containsExactlyInAnyOrderEntriesOf(Map.of("key2", 1L));
  }
}

The test fails with

java.lang.AssertionError:
Expecting map:
{“key1”=2L, “key2”=1L}
to contain only:
[“key2”=1L]
but the following map entries were unexpected:
[“key1”=2L]

which indicates there is still a result for key1 around even though its messages are older than one window. Why is that happening?

I’ve tried advancing the stream time with inputTopic.advanceTime() and testDriver.advanceWallClockTime(), neither of these methods are design for this. Eventually, I’m sending another message with a different key to advance stream time and hopefully move the sliding window forward.

Your observation is correct.

While Kafka Streams updates sliding window when an event drops out, it does such an update only if the updated window is not empty.

Is this current behavior a problem for your use-case? (Thinking about it, it might actually be possible to change the behavior…)

For your test setup though, you would get multiple results for key1, as you are reading the output topic, which contains all (intermediate) results. – Note that topics are append only, and thus, even if a window expires, as long as topic retention did not kick it to remove older windows, the old window results are still there.

Given your window size of 10 seconds, you should actually get three windows for key1: one with count 1 (for first key1 event, with window bounds [r1.ts-10, rt.ts]), count 2 (with window bounds [r2.ts-10, r2.ts]), and again count 1 when the first event drops out of the window with bound [r1.ts + 1ms, r1.ts + 10,001ms]. (Note that the order in which windows are emitted, is actually different, and the last count=1 window, is emitted before the count=2 window).

You only get one results for key1 though, as your map(...) after the aggregation removes the window information, and thus only the last emitted update will be observed in the Map you assemble… And given that the count=2 window is emitted last, it’s the result you observe in your Map.

Thank you for the explanation. :smiley:

Well yes, in the end, I want to count events per key within a 24 hours window. So this count should go back to zero (or have no result for a given key) when there is no event for a key in the last 24 hours.

This .map((windowedKey, value) -> KeyValue.pair(windowedKey.key(), value)) is also something I’m sure about. I saw it in a book, but I don’t understand its purpose beyond making the topic readable without having to use Serde<Windowed<String>> for the key… Should I test this differently?

Eventually, I plan to read the output topic into a GlobalKTable in another application to join it with a KStream (i.e. looking things up in this table). Should the output topic be configured with a retention time equal to the window size, so that old windows are dropped?

Well yes, in the end, I want to count events per key within a 24 hours window. So this count should go back to zero (or have no result for a given key) when there is no event for a key in the last 24 hours.

What do you mean by or have no result for a given key – that is exactly what you get. No new results for a key… Or course, the previous intermediate result are still in the topic until retention time kicks in, but if you take the window bounds into account, you can drop/expire old result in your application.

In any case, you might want to file a Jira ticket with a feature/improvement request: Follow current development and issues

I saw it in a book, but I don’t understand its purpose beyond making the topic readable without having to use Serde<Windowed<String>> for the key… Should I test this differently?

Yes, that’s a simplification to only make there Serde setup easier. In practice, I would not recommend this, but keep the windowed information.

Eventually, I plan to read the output topic into a GlobalKTable in another application to join it with a KStream (i.e. looking things up in this table). Should the output topic be configured with a retention time equal to the window size, so that old windows are dropped?

This does complicate things… The topic that you use for a GlobalKTable should be configured with log-compaction, not retention. – And there is no built-in “expiration” mechanism for older windows. – It also seems, you might want to only get the most recent window result into the GlobalKTable?

I would recommend two things: use emit final strategy, to only get results of completed windows, and to avoid partial result, and second, use a post-processing step after the aggregation, and before you write into the sink topic, to generate tombstones for old windows. The would be done best with an process() step and state store, ie, each new window get stored, and emitted forward. Plus use punctuations to find old/expired windows and generate tombstone for them.

(It’s getting a little bit complicated now, so will be difficult to get into more details – I hope the high level idea is clear?)

I mean that a lookup like map.get(key) on the topic’s content either return zero as value or null (no value).

Yes, I would like to have only the most recent aggregation result per key in that GlobalKTable.

I implemented your two suggestions, adding .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) to the topology right after count(), and a Processor/Punctuator combo to emit tombstones for windows which end time is older than “now minus window size”. I hope I got it and still need to write some tests to confirm my understanding.

This indeed makes things more complicated. Would my use case be better served by hopping windows instead of sliding windows, or are they just as tricky to handle? Depending on the business case, we may not need results as often as sliding windows can produce them (I need to go and check this assumption with the right people).

Where can I find documentation about such details like the number of messages emitted by sliding windows or the appropriate topic configuration for a GlobalKTable? I’ve read two books on Kafka Streams and they didn’t mention this, neither did I see this kind of information in the Kafka Streams API Javadoc… The web is full of simple examples, but nothing like this turns up in searches.

I mean that a lookup like map.get(key) on the topic’s content either return zero as value or null (no value).

That’s not really how Kafka, as an append only log storage works…

Would my use case be better served by hopping windows instead of sliding windows, or are they just as tricky to handle

Don’t think there would be a difference in complexity. But both windows are pretty difference semantics, and you should pick the window type based on business logic needs.

Where can I find documentation about such details like the number of messages emitted by sliding windows or the appropriate topic configuration for a GlobalKTable? I’ve read two books on Kafka Streams and they didn’t mention this, neither did I see this kind of information in the Kafka Streams API Javadoc… The web is full of simple examples, but nothing like this turns up in searches.

Seems we need to improve our docs… did you check out the JavaDocs? We did update them significantly (work is not completed though) with 4.0.0 release.

In general, Kafka is an open source project, and it depends on the community. So if you see gaps, please join in, open PRs (or at least file Jira tickets), so docs etc can be improved.