Hi, I’m very fresh on Kafka and I’m taking the confluent 101 courses.
I was going through the aggregation class: Kafka Streams Aggregations - Hands On and ran in some issues.
I prepared my code just fine for the aggregation
package io.confluent.developer.aggregate;
import io.confluent.developer.StreamsUtils;
import io.confluent.developer.avro.ElectronicOrder;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class StreamsAggregate {
public static void main(String[] args) throws IOException {
final Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregate-streams");
StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = streamsProps.getProperty("aggregate.input.topic");
final String outputTopic = streamsProps.getProperty("aggregate.output.topic");
final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
final SpecificAvroSerde<ElectronicOrder> electronicSerde =
StreamsUtils.getSpecificAvroSerde(configMap);
final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde))
.peek((key, value) -> System.out.println("Incoming record - key " + key + " value " + value));
electronicStream.groupByKey().aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double()))
.toStream()
.peek((key, value) -> System.out.println("Outgoing record - key " + key + " value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps)) {
final CountDownLatch shutdownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
kafkaStreams.close(Duration.ofSeconds(2));
shutdownLatch.countDown();
}));
TopicLoader.runProducer();
try {
kafkaStreams.start();
shutdownLatch.await();
} catch (Throwable e) {
System.exit(1);
}
}
System.exit(0);
}
}
but when I run ./gradlew runStreams -Pargs=aggregate on my terminal the only outputs I get are
> Configure project :
Using example io.confluent.developer.aggregate.StreamsAggregate
> Task :runStreams
Record produced - offset - 24 timestamp - 1695216441194
Record produced - offset - 25 timestamp - 1695216451194
Record produced - offset - 26 timestamp - 1695216461194
Record produced - offset - 27 timestamp - 1695216473194
where it was supposed to output:
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622149038018}...
To be sure I didn’t messed up on my code I copied and pasted the solution and still shows the same results.
I disabled the VPN so this is not the problem.
Anyone has any idea on how to fix it? thanks!!