Hi,
Im trying to aggregate and count events over 1 minute tumbling windows.
I try to process a topic from the start (begining of march), but im getting loads of “expired window” warnings in the log.
I dont really understand why the window is expired. Check out this log:
2022-03-25 13:45:29.263 WARN 24112 --- [-StreamThread-1] o.a.k.s.k.i.KStreamWindowAggregate : Skipping record for expired window.
key=[a key] topic=[reporter2-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition] partition=[8] offset=[29599]
timestamp=[1646567607318] Sun Mar 06 2022 11:53:27
window= [1646567580000, Sun Mar 06 2022 11:53:00
1646567640000) Sun Mar 06 2022 11:54:00
expiration[1646642662531] Mon Mar 07 2022 08:44:22
streamTime[1646642962531] Mon Mar 07 2022 08:49:22
I have converted the millis to actual datetime in the log just for clarity.
When i look at this log i can see the timestamp of the event is within the timewindow.
Also the timestamp is before the expiration
StreamTime im not sure what is, but since i was running it just now, it seems weird that it is on march 7th.
This is my stream config:
Duration windowSize = Duration.ofMinutes(1);
TimeWindows tumblingWindow = TimeWindows.ofSizeAndGrace(windowSize, Duration.ofMinutes(5));
activities
.groupBy((key, record) -> buildCompositeField(record))
.windowedBy(tumblingWindow)
.aggregate(StatsPerMinute::new, (key, record, statsPerMinute) -> {
statsPerMinute.setName(Objects.requireNonNullElse(record.getName(), ""));
statsPerMinute.setOtherVariables("bla bla");
statsPerMinute.setKey(key);
statsPerMinute.setCount(statsPerMinute.getCount() + 1);
return statsPerMinute;
},
Materialized.with(Serdes.String(), statsPerMinuteSerde))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.peek((k, statsPerMinute) -> {
statsPerMinute.setFrom(k.window().startTime());
statsPerMinute.setTo(k.window().endTime());
})
.to("statstest2");
If anyone know whats going wrong here why so many of the old events are “expired” then i would be happy for any pointers!
Once the old events are processed. Any new events seems to be aggregated just fine.