Session Window Aggregation in Kafka Streams DSL

I am creating a Kafka streams application to aggregate a set of events statefully,
the code for the topology,
I added the log in the adder function, and able to get required aggregated result, but the output of the aggregation is not being sent to the kafka-output topic, I am adding the code for the topology file which is as follows:
"
package io.confluent.developer;

import io.confluent.developer.avro.VideoEvents;
import io.confluent.developer.avro.AggregatedPlayEvents;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.streams.kstream.Materialized;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import org.apache.kafka.streams.KeyValue;

public class VideoStreamApplication {
private static final Logger logger = LoggerFactory.getLogger(VideoStreamApplication.class);
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();

    final Serde<String> stringSerde = Serdes.String();
    final Serde<VideoEvents> videoEventsSerde = getSpecificAvroSerde(allProps);
    final Serde<AggregatedPlayEvents> aggregatedPlayEventsSerde = getSpecificAvroSerde(allProps);

    final String videoTopic = allProps.getProperty("input.topic.name");
    final String outputTopic = allProps.getProperty("output.topic.name");

    Consumed<String, VideoEvents> videoEventsConsumerOptions =
            Consumed.with(stringSerde,videoEventsSerde)
                    .withTimestampExtractor(new EventTimeExtractor());

    KStream<String, VideoEvents> videoEvents = builder.stream(videoTopic, videoEventsConsumerOptions);

    SessionWindows sessionWindow = SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(30));

            videoEvents
                    .groupByKey()
                    .windowedBy(sessionWindow)
                    .aggregate(
                            VideoEventsAggregator::newObject,
                            VideoEventsAggregator::add,
                            VideoEventsAggregator::merge,
                            Materialized.<String,AggregatedPlayEvents,SessionStore<Bytes,byte[]>>as("videoEventStore").withKeySerde(stringSerde).withValueSerde(aggregatedPlayEventsSerde)
                    )
                    .suppress(untilWindowCloses(unbounded()))
                    .toStream()
                    .map((windowKey, value) -> KeyValue.pair(windowKey.key(),value))
                    .to(outputTopic,Produced.with(stringSerde,aggregatedPlayEventsSerde))
            ;

    return builder.build();
}
static <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde(final Properties allProps) {
    final SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>();

    final Map<String, String> serdeConfig = (Map)allProps;
    specificAvroSerde.configure(serdeConfig, false);
    return specificAvroSerde;
}
public Properties loadEnvProperties(String fileName) throws IOException {
    Properties allProps = new Properties();
    FileInputStream input = new FileInputStream(fileName);
    allProps.load(input);
    input.close();

    return allProps;
}

public static void main(String[] args) throws Exception {
    if (args.length < 1) {
        throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
    }

    VideoStreamApplication app = new VideoStreamApplication();

    Properties allProps = app.loadEnvProperties(args[0]);
    allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
    allProps.put(SCHEMA_REGISTRY_URL_CONFIG, allProps.getProperty("schema.registry.url"));
    // allProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimeExtractor.class.getName());
    Topology topology = app.buildTopology(allProps);

    final KafkaStreams streams = new KafkaStreams(topology, allProps);
    final CountDownLatch latch = new CountDownLatch(1);

    // Attach shutdown handler to catch Control-C.
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close(Duration.ofSeconds(5));
            latch.countDown();
        }
    });

    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}

}
" ?

How do you know that you get the right result if it’s not written to the topic?

Also, you are using .suppress(untilWindowCloses(unbounded())), so could it be that your input data stops, and the window is not closed? Note that suppress() works on event-time and if data flow stops, event-time does not advance any longer.

1 Like

Ok, I tried commenting the suppress command, and changed my topology to the following :

public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();

        final Serde<String> stringSerde = Serdes.String();
        final Serde<VideoEvents> videoEventsSerde = getSpecificAvroSerde(allProps);
        final Serde<AggregatedPlayEvents> aggregatedPlayEventsSerde = getSpecificAvroSerde(allProps);

        final String videoTopic = allProps.getProperty("input.topic.name");
        final String outputTopic = allProps.getProperty("output.topic.name");

        Consumed<String, VideoEvents> videoEventsConsumerOptions =
                Consumed.with(stringSerde,videoEventsSerde)
                        .withTimestampExtractor(new EventTimeExtractor());

        KStream<String, VideoEvents> videoEvents = builder.stream(videoTopic, videoEventsConsumerOptions);

        SessionWindows sessionWindow = SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(30));

                videoEvents
                        .groupByKey()
                        .windowedBy(sessionWindow)
                        .aggregate(
                                VideoEventsAggregator::newObject,
                                VideoEventsAggregator::add,
                                VideoEventsAggregator::merge,
                                Materialized.<String,AggregatedPlayEvents,SessionStore<Bytes,byte[]>>as("videoEventStore").withKeySerde(stringSerde).withValueSerde(aggregatedPlayEventsSerde)
                        )
//                        .suppress(untilWindowCloses(unbounded()))
                        .toStream()
                        .peek((K,V) -> logger.info("-----------------------------------------------------------------\nKey and value are : \n{},{}\n-----------------------------------------------------------------",K,V))
                        .map((windowKey, value) -> {
                            logger.info("=================================================================");
                            logger.info("Key and value are : \n{},{}",windowKey,value);
                            logger.info("Key value pair data is : {} ",KeyValue.pair(windowKey.key(),value));
                            logger.info("Key ");
                            logger.info("=================================================================");
                            return KeyValue.pair(windowKey.key(),value);
                        })
                        .to(outputTopic,Produced.with(stringSerde,aggregatedPlayEventsSerde))
                ;

        return builder.build();
    }

I am able to see the log output: in the map value but, the event is not being pushed to the output topic.

Hard to say… What is the timestamps in your input data and what is your output topic retention time? Given that you see the records in the map() after the aggregation, the data should get into the output topic. Did you check if output topic (begin/end) offsets change?