In a session window, I am unclear when I get the results. How do I know when the session for a particular key has ended? When do I get the results so that I can take the aggregated results and do further processing. For example … if I want to get the total count of a user’s errors. Each user would have their own session by the key. Once the 5 minute window and 30 second grace period has ended. I would expect to get the final event with the user data and the total count within that window. I only see the events as they happen within the window…not the final tally or when the window closed.
public void process(KStream<String, UserEvent> input) {
input.peek((k,v) -> log.info("Key = " + k + " Create Time = "
+ Instant.ofEpochMilli(v.getCreatedTime()).atOffset(ZoneOffset.UTC)))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
.count()
//.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.suppress(Suppressed.untilTimeLimit(Duration.ofMinutes(5), Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k,v) -> log.info(
"UserID: " + k.key() +
" Window start: " + Instant.ofEpochMilli(k.window().start()).atOffset(ZoneOffset.UTC) +
" Window end: " + Instant.ofEpochMilli(k.window().end()).atOffset(ZoneOffset.UTC) +
" Count: " + v + " Window#: " + k.window().hashCode()));
}
My logs show events, but I expect to only see an event per key once the window+grace has expired…so that I can take the key and the summary and do further processing.