Kafka streams Session Window help

In a session window, I am unclear when I get the results. How do I know when the session for a particular key has ended? When do I get the results so that I can take the aggregated results and do further processing. For example … if I want to get the total count of a user’s errors. Each user would have their own session by the key. Once the 5 minute window and 30 second grace period has ended. I would expect to get the final event with the user data and the total count within that window. I only see the events as they happen within the window…not the final tally or when the window closed.

public void process(KStream<String, UserEvent> input) {
        input.peek((k,v) -> log.info("Key = " + k + " Create Time = "
            + Instant.ofEpochMilli(v.getCreatedTime()).atOffset(ZoneOffset.UTC)))
                .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
                .suppress(Suppressed.untilTimeLimit(Duration.ofMinutes(5), Suppressed.BufferConfig.unbounded()))
                .foreach((k,v) -> log.info(
                        "UserID: " + k.key() +
                        " Window start: " + Instant.ofEpochMilli(k.window().start()).atOffset(ZoneOffset.UTC) +
                        " Window end: " + Instant.ofEpochMilli(k.window().end()).atOffset(ZoneOffset.UTC) +
                        " Count: " + v + " Window#: " + k.window().hashCode()));

My logs show events, but I expect to only see an event per key once the window+grace has expired…so that I can take the key and the summary and do further processing.

Hi @bran,

Based on what you have here, this looks like it should work. Have you confirmed the timestamps for your records? Maybe you can try this out with the TopologyTestDriver, which will allow you to set your timestamps per record, that way you can orchestrate the exact scenario you’re looking to achieve.