Kafka Session Windowing Suppress

KTable<Windowed<String>, MyClass> windowedStream = sessionKStream
                .selectKey((k, v) -> v.getId())
                .groupByKey()
                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(2)))
                .reduce((v1, v2) -> v2, Materialized.with(Serdes.String(), new MyClass()));
        KTable<Windowed<String>, MyClass> suppressedTable = windowedStream.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
        KStream<String, MyClass> processedStream = suppressedTable.toStream().map((k, v) -> KeyValue.pair(k.key(), v));
                processedStream.transformValues(() -> new ValueTransformer<>() {
                    @Override
                    public void init(ProcessorContext context) {
                    }

                    @Override
                    public MyClass transform(MyClass value) {
                        if (value.getStatus() != null && !value.getStatus().equals("FINAL")) {
                            value.setStatus("FINAL");
                        }
                        return value;
                    }

                    @Override
                    public void close() {
                    }
                });
        processedStream.to("mytopic", Produced.with(Serdes.String(), new MyClass()));

At the end of window time of 2 minutes, the messages are not immediately produced to “mytopic”. Only if 4-5 messages ate produced in the input topic, only then a batch of messages are produced to the “mytopic”. The linger ms and batch size is also set to zero. But, still the messages are batched for some reason.

suppress() works on “stream-time” not wall-clock time. “stream-time” is base on the input record timestamps, so only if data is flowing, and input records with larger timestamps are processed, “stream-time” advanced allowing suppress() to emit results.

1 Like

Is there some way to override this behaviour and emit the results as soon as the window ends?

emit the results as soon as the window ends

Sessions are event driven and thus that’s not possible. If you have window gap of 5 second, and you get input events with ts, 1,2,3 but not more events, you have a session [1,3] and stream-time is 3. Even if the next event comes in at ts=9 and thus [1,3] is the session that will be closed, at stream-time 3 we don’t know what the next event will be. If the next event would have a ts=7 instead of ts=9, the session would be extended to [1,7] and would not close. – Thus, we can only close the window when we see ts=9. Does this make sense? Otherwise we would require some magic power to look into the future.

Note though: advance of time is not based on keys… So if you have a session for key=A of [1,3], and a record with key=B and ts=9 comes it, it would also close the session of key A (assuming that A and B are in the same partition).