Kafka Streams mapValues isn't working for me please help asap

I am at a complete loss as I cannot find the solution to my answer anywhere. I am consuming this avro object with my Kafka Streams App

{"num_enterprise_txn_entity_edition": 1, 
  "num_enterprise_txn_entity_version": 1, 
  "dte_event_occurred": "2022-06-30T18:42:49.533301Z", 
  "nme_creator": "AvroProducer", 
  "nme_event_type": "ReadyToSubmit"}

I need my KAfka Streams app to transform record so it looks like this

{"num_enterprise_txn_entity_edition": 1, 
  "num_enterprise_txn_entity_version": 1, 
  "dte_event_occurred": "2022-06-30T18:42:49.533301Z", 
  "nme_creator": "KafkaStreamsApp", 
  "nme_event_type": "NewSubmission"}

My code looks like this. Please tell me what I am doing wrong. I have a feeling there’s something wrong with the .mapValues()

import java.io.InputStream;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import javax.xml.bind.helpers.ValidationEventLocatorImpl;


public class streams {

    public static void main(String[] args) throws IOException {

        Properties properties = new Properties();

        InputStream in = streams.class.getClassLoader().getResourceAsStream("kafkastream.properties");
        properties.load(in);

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

        final String inputTopic = properties.getProperty("producer.send.topic");
        final String outputTopic = "SubmissionsTopic";

        StreamsBuilder builder = new StreamsBuilder();


        KStream<String, POCEntity> firstStream = builder.stream(inputTopic);
        firstStream.peek((key, value) -> System.out.println("Value " + value))
            .filter((key, value) -> value.getNmeEventType().equals("ReadyToSubmit"))
            .mapValues((ValueMapper<POCEntity, POCEntity>) pocEntity -> {
                POCEntity pocEntity1 = new POCEntity().newBuilder()
                    .setIdtEnterpriseTxnEntity(pocEntity.getIdtEnterpriseTxnEntity())
                    .setNumEnterpriseTxnEntityEdition(pocEntity.getNumEnterpriseTxnEntityEdition())
                    .setNumEnterpriseTxnEntityVersion(pocEntity.getNumEnterpriseTxnEntityVersion())
                    .setDteEventOccurred(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
                    .setNmeCreator("StreamsApp")
                    .setNmeEventType("NewSubmission").build();

                return pocEntity1;
            })
            .peek((key, value) -> System.out.println("Value " + value))
            .to(outputTopic);

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        kafkaStreams.start();

    }

}

What is the issue? A “it does not work” is too generic to provide help… Do you get the wrong output? Do you get no output? Does you app crash (what is the error)?

The only thing I noticed in your code snippet is that there is not shutdown hook as recommended (Cf How to build your first Apache Kafka Streams application using Confluent)

Basically mapValues wasn’t transforming my record the way I wanted it to but I got it to work eventually!

Regarding the shutdown hook, is that still necessary even with a Spring Streams app? I recall reading Spring takes care of all the lifecycle stuff for you.

1 Like

Not sure what Spring does. Could be.

Glad you figured it out!

1 Like