Hi,
I am working on a function to process a Stream and remove events when the key exists in the GlobalKTable.
It needs to be a generic function because it will process different topics/events.
The application is working fine, but I am facing the error below with the test myTopology_processEventsProto_shouldConsumeFilterAndPublish.
line: inputTopic.pipeInput(record);
java.lang.NullPointerException: Cannot invoke “io.confluent.kafka.serializers.subject.strategy.ReferenceSubjectNameStrategy.subjectName(String, String, boolean, io.confluent.kafka.schemaregistry.ParsedSchema)” because “strategy” is null
I tried to find an example that tests a function using <KStream<String, com.google.protobuf.Message>, but I didn’t find it.
// Topology.java
@Bean
public BiFunction<KStream<String, com.google.protobuf.Message>, GlobalKTable<String, CustomerConsentProto.CustomerConsent>, KStream<String, com.google.protobuf.Message>> processEventsProto() {
return (stream, table) -> (
stream
.map((key, value) -> KeyValue.pair(key.trim(), value))
.peek((key, value) -> log.info("******Received event key={} value={}", key, value.toString()))
.leftJoin(table, (key, value) -> key, (streamRecord, tableRecord) -> {
if (tableRecord == null){
return streamRecord;
}
return null;
} )
.peek((key, value) -> log.info("The customerId={} gave the consent={}", key, Optional.ofNullable(value).isPresent())
)
.filter((key, value) -> Optional.ofNullable(value).isPresent())
.peek((key, value) -> log.info("The customerId={} will go to the output", key))
);
}
// TopologyTest.java
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = MyTopology.class)
@ImportAutoConfiguration()
@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
@ExtendWith(MockitoExtension.class)
public class MyTopologyTest {
private static final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
private static final Serde<String> stringSerde = Serdes.String();
private static final KafkaProtobufSerde<com.google.protobuf.Message> kafkaProtobufSerde = new KafkaProtobufSerde<>(schemaRegistryClient);
private KafkaProtobufSerde<CustomerConsentProto.CustomerConsent> protobufSerdeCustomerConsent() {
KafkaProtobufSerde<CustomerConsentProto.CustomerConsent> serde = new KafkaProtobufSerde<>(schemaRegistryClient, CustomerConsentProto.CustomerConsent.class);
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://fake");
serde.configure(serdeConfig, false);
return serde;
}
@Test
void myTopology_processEventsProto_shouldConsumeFilterAndPublish() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
// Parameter 1 // kafkaProtobufSerde
KStream<String, com.google.protobuf.Message> inputEventStream = streamsBuilder.stream("inputEvent.proto", Consumed.with(stringSerde, kafkaProtobufSerde));
// Parameter 2
String globalTableTopic = "customer-consent.proto";
String globalStore = "globalStore";
GlobalKTable<String, CustomerConsentProto.CustomerConsent> consentGlobalTable = streamsBuilder.globalTable(globalTableTopic, Consumed.with(stringSerde, protobufSerdeCustomerConsent()), Materialized.as(Stores.inMemoryKeyValueStore(globalStore)));
// Creating function
BiFunction<KStream<String, com.google.protobuf.Message>, GlobalKTable<String, CustomerConsentProto.CustomerConsent>, KStream<String, com.google.protobuf.Message>> processStream = new MyTopology().processEventsProto();
KStream<String, com.google.protobuf.Message> customerConsentEventKStream = processStream.apply( inputEventStream, consentGlobalTable);
// Parameter 3 - output
customerConsentEventKStream.to("outputEvent.proto", Produced.with(stringSerde, kafkaProtobufSerde));
try (TopologyTestDriver testDriver = new TopologyTestDriver(streamsBuilder.build())) {
TestInputTopic<String, com.google.protobuf.Message> inputTopic = testDriver.createInputTopic(
"inputEvent.proto",
stringSerde.serializer(),
kafkaProtobufSerde.serializer());
TestInputTopic<String, CustomerConsentProto.CustomerConsent> globalKTableTopic = testDriver.createInputTopic(
"customer-consent.proto",
stringSerde.serializer(),
protobufSerdeCustomerConsent().serializer());
TestOutputTopic<String, com.google.protobuf.Message> outputTopic = testDriver.createOutputTopic(
"outputEvent.proto",
stringSerde.deserializer(),
kafkaProtobufSerde.deserializer()
);
// Event to the Parameter 1
String key = "121380835";
com.google.protobuf.Message inputEvent = getInputEvent(key);
TestRecord<String, com.google.protobuf.Message> record = new TestRecord<>(key, inputEvent);
record.headers();
inputTopic.pipeInput(record);
// Event to the Parameter 2
String key2 = "121380840";
CustomerConsentProto.CustomerConsent consentUpdatedEvent = getConsent(key);
TestRecord<String, CustomerConsentProto.CustomerConsent> recordGlobalTable = new TestRecord<>(key2, consentUpdatedEvent);
recordGlobalTable.headers();
globalKTableTopic.pipeInput(recordGlobalTable);
// Checking output
Assertions.assertThat(outputTopic.isEmpty()).isFalse();
InputEventProto.InputEvent response = (InputEventProto.InputEvent) outputTopic.readValue();
Assertions.assertThat(inputEvent).isEqualTo(response);
}
}
}