TimeWindow flushing data instantly

I have a requirement to cont the number of packet for a time period based on certain key and published on a topic once the time period is elapsed.

for example i am getting packet in below pattern frequently

Rank	Time
1	1/10/2021 11:30:00
2	1/11/2021 11:31:00
1	1/10/2021 11:32:00
2	1/11/2021 11:33:00
2	1/11/2021 11:34:00

If i have a window of 5 minute then at every 5 minute , accumulated data should be published to a topic like

Rank  Count
1             2
2.            3

You might want to use suppress() operator if you only want to get the final result.

1 Like

Thanks for your reply.

I am using the suppress but still final data is not holding back for specified window time. Below is my code snipped

KStream<String, Message> stream = kStreamsBuilder.stream("IN", Consumed.with(Serdes.String(), CustomSerdes.Message()));

KStream<Windowed<String>, Long> windowKStream = stream.groupBy((key, value) -> formGroupingKey(value))
                
                .windowedBy(TimeWindows.of(Duration.ofMinutes(60)))
               
                .count(Materialized.with(Serdes.String(), Serdes.Long())).suppress(Suppressed.untilTimeLimit(60, Suppressed.BufferConfig.unbounded())).toStream().to("OUT", Produced.with(Serdes.String(), CustomSerdes.OutData()));

You should use untilWindowCloses() instead of untilTimeLimit().

1 Like

Thanks mjsax,

I have tried with suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) but now data is not getting flushed out .

It will get flushed out when stream-time advances beyond window-close time (that is windowd-end plus grace-period). By default, grace-period is set to 24h though. You might want to reduce it via TimeWindows.of(Duration.ofMinutes(60).grace(...)).

Also note: it’s not based on wall-clock time; stream-time is only advanced when new records are processed.

1 Like

Thanks mjsax,

After adding grace time it is flushing out correctly.

1 Like