I am creating a Kafka streams application to aggregate a set of events statefully,
the code for the topology,
I added the log in the adder function, and able to get required aggregated result, but the output of the aggregation is not being sent to the kafka-output topic, I am adding the code for the topology file which is as follows:
"
package io.confluent.developer;
import io.confluent.developer.avro.VideoEvents;
import io.confluent.developer.avro.AggregatedPlayEvents;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.streams.kstream.Materialized;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import org.apache.kafka.streams.KeyValue;
public class VideoStreamApplication {
private static final Logger logger = LoggerFactory.getLogger(VideoStreamApplication.class);
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<String> stringSerde = Serdes.String();
final Serde<VideoEvents> videoEventsSerde = getSpecificAvroSerde(allProps);
final Serde<AggregatedPlayEvents> aggregatedPlayEventsSerde = getSpecificAvroSerde(allProps);
final String videoTopic = allProps.getProperty("input.topic.name");
final String outputTopic = allProps.getProperty("output.topic.name");
Consumed<String, VideoEvents> videoEventsConsumerOptions =
Consumed.with(stringSerde,videoEventsSerde)
.withTimestampExtractor(new EventTimeExtractor());
KStream<String, VideoEvents> videoEvents = builder.stream(videoTopic, videoEventsConsumerOptions);
SessionWindows sessionWindow = SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(30));
videoEvents
.groupByKey()
.windowedBy(sessionWindow)
.aggregate(
VideoEventsAggregator::newObject,
VideoEventsAggregator::add,
VideoEventsAggregator::merge,
Materialized.<String,AggregatedPlayEvents,SessionStore<Bytes,byte[]>>as("videoEventStore").withKeySerde(stringSerde).withValueSerde(aggregatedPlayEventsSerde)
)
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((windowKey, value) -> KeyValue.pair(windowKey.key(),value))
.to(outputTopic,Produced.with(stringSerde,aggregatedPlayEventsSerde))
;
return builder.build();
}
static <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde(final Properties allProps) {
final SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = (Map)allProps;
specificAvroSerde.configure(serdeConfig, false);
return specificAvroSerde;
}
public Properties loadEnvProperties(String fileName) throws IOException {
Properties allProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
allProps.load(input);
input.close();
return allProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
VideoStreamApplication app = new VideoStreamApplication();
Properties allProps = app.loadEnvProperties(args[0]);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
allProps.put(SCHEMA_REGISTRY_URL_CONFIG, allProps.getProperty("schema.registry.url"));
// allProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimeExtractor.class.getName());
Topology topology = app.buildTopology(allProps);
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
" ?