I have a requirement to cont the number of packet for a time period based on certain key and published on a topic once the time period is elapsed.
for example i am getting packet in below pattern frequently
Rank Time
1 1/10/2021 11:30:00
2 1/11/2021 11:31:00
1 1/10/2021 11:32:00
2 1/11/2021 11:33:00
2 1/11/2021 11:34:00
If i have a window of 5 minute then at every 5 minute , accumulated data should be published to a topic like
Rank Count
1 2
2. 3
mjsax
17 January 2022 18:42
2
You might want to use suppress()
operator if you only want to get the final result.
1 Like
Thanks for your reply.
I am using the suppress but still final data is not holding back for specified window time. Below is my code snipped
KStream<String, Message> stream = kStreamsBuilder.stream("IN", Consumed.with(Serdes.String(), CustomSerdes.Message()));
KStream<Windowed<String>, Long> windowKStream = stream.groupBy((key, value) -> formGroupingKey(value))
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)))
.count(Materialized.with(Serdes.String(), Serdes.Long())).suppress(Suppressed.untilTimeLimit(60, Suppressed.BufferConfig.unbounded())).toStream().to("OUT", Produced.with(Serdes.String(), CustomSerdes.OutData()));
mjsax
18 January 2022 18:48
4
You should use untilWindowCloses()
instead of untilTimeLimit()
.
1 Like
Thanks mjsax,
I have tried with suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
but now data is not getting flushed out .
mjsax
20 January 2022 16:03
6
It will get flushed out when stream-time advances beyond window-close time (that is windowd-end plus grace-period). By default, grace-period is set to 24h though. You might want to reduce it via TimeWindows.of(Duration.ofMinutes(60).grace(...))
.
Also note: it’s not based on wall-clock time; stream-time is only advanced when new records are processed.
1 Like
Thanks mjsax,
After adding grace time it is flushing out correctly.
1 Like