How to set Tumbling Window?

Hello frens,
I want to have such tumbling window:

but my code:

distinctTradeStream
        .mapValues(v -> v.getEventTimestamp())
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .aggregate(
                () -> null,
                (key, value, agg) -> value,
                Materialized.with(stringSerde, longSerde))
        .toStream().print(Printed.toSysOut());

produces such timestamps:

[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824040000/1620824100000], 1620824089981
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824100000/1620824160000], 1620824109190
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824100000/1620824160000], 1620824149766
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824160000/1620824220000], 1620824167557
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824160000/1620824220000], 1620824194857
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824220000/1620824280000], 1620824239974
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824220000/1620824280000], 1620824273877
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824280000/1620824340000], 1620824332134
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824340000/1620824400000], 1620824353515
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824340000/1620824400000], 1620824397734

which shows that the wide of the windows is much more frequent than 1 minute:
(the timestamp converted to datetime (it means the last time stamp of the window)):

12:54:49.981
12:55:09.190
12:55:49.766
12:56:07.557
12:56:34.857
12:57:19.974
12:57:53.877
12:58:52.134
12:59:13.515
12:59:57.734

As you can see from the output, there are three timestamps associated with each window:

[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824040000/1620824100000], 1620824089981

window start timestamp: 1620824040000 (encoded in the message key)
window end timestamp: 1620824100000 (not physically stored in the message, but computed as “window start timestamp + window size”)
and record timestamp: 1620824089981 itself.

The record timestamp is not the same as the window end timestamp but it is computed as max(r1.ts,...,rn.ts) over all record r1,...,rn that fall into the window. So it’s basically the timestamp of the last update to the window.

1 Like

thx fren,
now I can see that windows are duplicated with different result:

[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824100000/1620824160000], 1620824109190
[KTABLE-TOSTREAM-0000000007]: [dotusd@1620824100000/1620824160000], 1620824149766

why? How to configure it to receive only 1 result on 1 window?

Kafka Streams implements a continuous update model: note that the result of aggregate() is a KTable. This table will contain one row per key and window, and each time a new input record is received, the corresponding row will be updated.

When you call toStream() you request the changelog stream of the table. Internally, Kafka Streams applies caching to tables, and thus, you see multiple updates per default (if you would disable caching, you would see all updates, ie, one for each input record). (Cf Kafka Streams Memory Management | Confluent Documentation)

If you only want to get the “final” result in the stream, you can apply the suppress() operator after the aggregation. Note that you would want to specify a gracePeriod on your window definition, because otherwise it defaults to 24h, and thus suppress() would not emit anything for a day…

For more background you should check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/

There is also a Kafka Summit talk about this topic: https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/

1 Like