Test KStream with Protobuf - java.lang.NullPointerException: Cannot invoke "io.confluent.kafka.serializers.subject.strategy.ReferenceSubjectNameStrategy.subjectName(String, String, boolean, io.confluent.kafka.schemaregistry.ParsedSchema)" because "strateg

Hi,

I am working on a function to process a Stream and remove events when the key exists in the GlobalKTable.

It needs to be a generic function because it will process different topics/events.

The application is working fine, but I am facing the error below with the test myTopology_processEventsProto_shouldConsumeFilterAndPublish.

line: inputTopic.pipeInput(record);

java.lang.NullPointerException: Cannot invoke “io.confluent.kafka.serializers.subject.strategy.ReferenceSubjectNameStrategy.subjectName(String, String, boolean, io.confluent.kafka.schemaregistry.ParsedSchema)” because “strategy” is null

I tried to find an example that tests a function using <KStream<String, com.google.protobuf.Message>, but I didn’t find it.

// Topology.java

@Bean
public BiFunction<KStream<String, com.google.protobuf.Message>, GlobalKTable<String, CustomerConsentProto.CustomerConsent>, KStream<String, com.google.protobuf.Message>> processEventsProto() {

    return (stream, table) -> (
            stream
                .map((key, value) -> KeyValue.pair(key.trim(), value))
                .peek((key, value) -> log.info("******Received event key={} value={}", key, value.toString()))
                .leftJoin(table, (key, value) -> key, (streamRecord, tableRecord) -> {
                    if (tableRecord == null){
                        return streamRecord;
                    }
                    return null;
                } )
                .peek((key, value) -> log.info("The customerId={} gave the consent={}", key, Optional.ofNullable(value).isPresent())
                )
                .filter((key, value) -> Optional.ofNullable(value).isPresent())
                .peek((key, value) -> log.info("The customerId={} will go to the output", key))

    );
}

// TopologyTest.java

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = MyTopology.class)
@ImportAutoConfiguration()

@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
@ExtendWith(MockitoExtension.class)
public class MyTopologyTest {

    private static final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

    private static final Serde<String> stringSerde = Serdes.String();

    private static final KafkaProtobufSerde<com.google.protobuf.Message> kafkaProtobufSerde = new KafkaProtobufSerde<>(schemaRegistryClient);

    private KafkaProtobufSerde<CustomerConsentProto.CustomerConsent> protobufSerdeCustomerConsent() {
        KafkaProtobufSerde<CustomerConsentProto.CustomerConsent> serde = new KafkaProtobufSerde<>(schemaRegistryClient, CustomerConsentProto.CustomerConsent.class);
        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://fake");
        serde.configure(serdeConfig, false);
        return serde;
    }


    @Test
    void myTopology_processEventsProto_shouldConsumeFilterAndPublish() {

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        // Parameter 1 // kafkaProtobufSerde
        KStream<String, com.google.protobuf.Message> inputEventStream = streamsBuilder.stream("inputEvent.proto", Consumed.with(stringSerde, kafkaProtobufSerde));

        // Parameter 2
        String globalTableTopic = "customer-consent.proto";
        String globalStore = "globalStore";
        GlobalKTable<String, CustomerConsentProto.CustomerConsent> consentGlobalTable = streamsBuilder.globalTable(globalTableTopic, Consumed.with(stringSerde, protobufSerdeCustomerConsent()), Materialized.as(Stores.inMemoryKeyValueStore(globalStore)));

        // Creating function
        BiFunction<KStream<String, com.google.protobuf.Message>, GlobalKTable<String, CustomerConsentProto.CustomerConsent>, KStream<String, com.google.protobuf.Message>> processStream = new MyTopology().processEventsProto();

        KStream<String, com.google.protobuf.Message> customerConsentEventKStream = processStream.apply( inputEventStream, consentGlobalTable);

        // Parameter 3 - output
        customerConsentEventKStream.to("outputEvent.proto", Produced.with(stringSerde, kafkaProtobufSerde));

        try (TopologyTestDriver testDriver = new TopologyTestDriver(streamsBuilder.build())) {
            TestInputTopic<String, com.google.protobuf.Message> inputTopic = testDriver.createInputTopic(
                "inputEvent.proto",
                stringSerde.serializer(),
                kafkaProtobufSerde.serializer());

            TestInputTopic<String, CustomerConsentProto.CustomerConsent> globalKTableTopic = testDriver.createInputTopic(
                "customer-consent.proto",
                stringSerde.serializer(),
                protobufSerdeCustomerConsent().serializer());

            TestOutputTopic<String, com.google.protobuf.Message> outputTopic = testDriver.createOutputTopic(
                "outputEvent.proto",
                stringSerde.deserializer(),
                kafkaProtobufSerde.deserializer()
            );

            // Event to the Parameter 1
            String key = "121380835";
            com.google.protobuf.Message inputEvent = getInputEvent(key);
            TestRecord<String, com.google.protobuf.Message> record = new TestRecord<>(key, inputEvent);

            record.headers();
            inputTopic.pipeInput(record);

            // Event to the Parameter 2
            String key2 = "121380840";
            CustomerConsentProto.CustomerConsent consentUpdatedEvent = getConsent(key);
            TestRecord<String, CustomerConsentProto.CustomerConsent> recordGlobalTable = new TestRecord<>(key2, consentUpdatedEvent);

            recordGlobalTable.headers();
            globalKTableTopic.pipeInput(recordGlobalTable);

            // Checking output
            Assertions.assertThat(outputTopic.isEmpty()).isFalse();
            InputEventProto.InputEvent response = (InputEventProto.InputEvent) outputTopic.readValue();
            Assertions.assertThat(inputEvent).isEqualTo(response);

        }

    }

}

Within your serdeConfig map have you tried defining the subject naming strategy?