Window expiration

Hi,
Im trying to aggregate and count events over 1 minute tumbling windows.
I try to process a topic from the start (begining of march), but im getting loads of “expired window” warnings in the log.
I dont really understand why the window is expired. Check out this log:

2022-03-25 13:45:29.263  WARN 24112 --- [-StreamThread-1] o.a.k.s.k.i.KStreamWindowAggregate       : Skipping record for expired window.
 key=[a key]  topic=[reporter2-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition] partition=[8] offset=[29599] 
 timestamp=[1646567607318] Sun Mar 06 2022 11:53:27
 window=   [1646567580000, Sun Mar 06 2022 11:53:00
            1646567640000) Sun Mar 06 2022 11:54:00
 expiration[1646642662531] Mon Mar 07 2022 08:44:22
 streamTime[1646642962531] Mon Mar 07 2022 08:49:22

I have converted the millis to actual datetime in the log just for clarity.
When i look at this log i can see the timestamp of the event is within the timewindow.
Also the timestamp is before the expiration
StreamTime im not sure what is, but since i was running it just now, it seems weird that it is on march 7th.
This is my stream config:

        Duration windowSize = Duration.ofMinutes(1);
        TimeWindows tumblingWindow = TimeWindows.ofSizeAndGrace(windowSize, Duration.ofMinutes(5));

        activities
                .groupBy((key, record) -> buildCompositeField(record))
                .windowedBy(tumblingWindow)
                .aggregate(StatsPerMinute::new, (key, record, statsPerMinute) -> {
                            statsPerMinute.setName(Objects.requireNonNullElse(record.getName(), ""));
                            statsPerMinute.setOtherVariables("bla bla");
                            statsPerMinute.setKey(key);
                            statsPerMinute.setCount(statsPerMinute.getCount() + 1);
                            return statsPerMinute;
                        },
                        Materialized.with(Serdes.String(), statsPerMinuteSerde))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .peek((k, statsPerMinute) -> {
                    statsPerMinute.setFrom(k.window().startTime());
                    statsPerMinute.setTo(k.window().endTime());
                })
                .to("statstest2");

If anyone know whats going wrong here why so many of the old events are “expired” then i would be happy for any pointers!
Once the old events are processed. Any new events seems to be aggregated just fine.

Hi dahlsdata,

What you’re seeing is expected behavior. Streamtime is the highest timestamp that a processor has seen so far and it never moves backward. If any record comes in with an earlier timestamp, streamtime remains where it is. So any records arriving with a timestamp less than (streamtime - grace period) is considered late and is not added to the windowed aggregation.

So in this case, it looks like the activities aggregation has processed a record with a timestamp of March 7 @ 08:49 so any records arriving with a ts prior to March 7 @ 08:44 will get dropped. Based on your description this seems to be what you are observing.

HTH,
Bill

Thanks for your reply and information,

Ok, yes that seems to be in line with what i am seeing. So that is probably exactly whats going on.

But then, any clues on how i should go about fixing my issues?
The timestamp is event-time (extracted through TimestampExtractor interface), so all events are produced to the topic in an orderly fashion.
The topic has 10 partitions so i know exact ordering is not guaranteed, hence the 5 minute grace-window.
I would really like to not increase the grace-period too much. In this case it would have had to be almost 21 hours for the window to not have expired. I cant really have a grace-period of around 24 hours in production.

So is there any way to instruct kafka-streams to try to consume the events in a bit more orderly fashion across the partitions?
Or how is these kind of issues normally solved?

Thankful for any advice!
Eric

I get your point about not wanting to extend that large of a grace period.
IMHO I think the best bet would be to use a Processor API integration with TransformValues and you can create a state store with the Stores class even possibly using a TimestampedKeyValueStore
Using the TransformValues approach, you can ensure that all records are included in an aggregation.