KTable<Windowed<String>, MyClass> windowedStream = sessionKStream
.selectKey((k, v) -> v.getId())
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(2)))
.reduce((v1, v2) -> v2, Materialized.with(Serdes.String(), new MyClass()));
KTable<Windowed<String>, MyClass> suppressedTable = windowedStream.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
KStream<String, MyClass> processedStream = suppressedTable.toStream().map((k, v) -> KeyValue.pair(k.key(), v));
processedStream.transformValues(() -> new ValueTransformer<>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public MyClass transform(MyClass value) {
if (value.getStatus() != null && !value.getStatus().equals("FINAL")) {
value.setStatus("FINAL");
}
return value;
}
@Override
public void close() {
}
});
processedStream.to("mytopic", Produced.with(Serdes.String(), new MyClass()));
At the end of window time of 2 minutes, the messages are not immediately produced to “mytopic”. Only if 4-5 messages ate produced in the input topic, only then a batch of messages are produced to the “mytopic”. The linger ms and batch size is also set to zero. But, still the messages are batched for some reason.