Hello, I’m running the code created in the Hands On: Basic Operations exercise, and it’s reporting this error below
training@training:~$ cd learn-kafka-courses/kafka-streams
training@training:~/learn-kafka-courses/kafka-streams$ ./gradlew runStreams -Pargs=basic
Starting a Gradle Daemon (subsequent builds will be faster)
Configure project :
Using example io.confluent.developer.basic.BasicStreams
Task :compileJava FAILED
FAILURE: Build failed with an exception.
- What went wrong:
Execution failed for task ‘:compileJava’.
Duplicate key io.confluent.developer.basic.solution.BasicStreams (attempted merging values io/confluent/developer/basic/solution/BasicStreams.java and io/confluent/developer/basic/BasicStreams.java)
- Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.
follow code used
package io.confluent.developer.basic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class BasicStreams {
public static void main(String[] args) throws IOException {
Properties streamsProps = new Properties();
try (FileInputStream fis = new FileInputStream("src/main/resources/streams.properties")) {
streamsProps.load(fis);
}
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "basic-streams");
StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = streamsProps.getProperty("basic.input.topic");
final String outputTopic = streamsProps.getProperty("basic.output.topic");
final String orderNumberStart = "orderNumber-";
KStream<String, String> firstStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
firstStream.peek((key, value) -> System.out.println("Incoming record - key " + key + " value " + value))
.filter((key, value) -> value.contains(orderNumberStart))
.mapValues(value -> value.substring(value.indexOf("-") + 1))
.filter((key, value) -> Long.parseLong(value) > 1000)
.peek((key, value) -> System.out.println("Outgoing record - key " + key + " value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps)) {
final CountDownLatch shutdownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
kafkaStreams.close(Duration.ofSeconds(2));
shutdownLatch.countDown();
}));
TopicLoader.runProducer();
try {
kafkaStreams.start();
shutdownLatch.await();
} catch (Throwable e) {
System.exit(1);
}
}
System.exit(0);
}
}