I am at a complete loss as I cannot find the solution to my answer anywhere. I am consuming this avro object with my Kafka Streams App
{"num_enterprise_txn_entity_edition": 1,
"num_enterprise_txn_entity_version": 1,
"dte_event_occurred": "2022-06-30T18:42:49.533301Z",
"nme_creator": "AvroProducer",
"nme_event_type": "ReadyToSubmit"}
I need my KAfka Streams app to transform record so it looks like this
{"num_enterprise_txn_entity_edition": 1,
"num_enterprise_txn_entity_version": 1,
"dte_event_occurred": "2022-06-30T18:42:49.533301Z",
"nme_creator": "KafkaStreamsApp",
"nme_event_type": "NewSubmission"}
My code looks like this. Please tell me what I am doing wrong. I have a feeling there’s something wrong with the .mapValues()
import java.io.InputStream;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import javax.xml.bind.helpers.ValidationEventLocatorImpl;
public class streams {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
InputStream in = streams.class.getClassLoader().getResourceAsStream("kafkastream.properties");
properties.load(in);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
final String inputTopic = properties.getProperty("producer.send.topic");
final String outputTopic = "SubmissionsTopic";
StreamsBuilder builder = new StreamsBuilder();
KStream<String, POCEntity> firstStream = builder.stream(inputTopic);
firstStream.peek((key, value) -> System.out.println("Value " + value))
.filter((key, value) -> value.getNmeEventType().equals("ReadyToSubmit"))
.mapValues((ValueMapper<POCEntity, POCEntity>) pocEntity -> {
POCEntity pocEntity1 = new POCEntity().newBuilder()
.setIdtEnterpriseTxnEntity(pocEntity.getIdtEnterpriseTxnEntity())
.setNumEnterpriseTxnEntityEdition(pocEntity.getNumEnterpriseTxnEntityEdition())
.setNumEnterpriseTxnEntityVersion(pocEntity.getNumEnterpriseTxnEntityVersion())
.setDteEventOccurred(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
.setNmeCreator("StreamsApp")
.setNmeEventType("NewSubmission").build();
return pocEntity1;
})
.peek((key, value) -> System.out.println("Value " + value))
.to(outputTopic);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start();
}
}