I am using Spring Cloud Stream functional programming model and trying to produce a Protobuf message using a protobuf schema. When receiving the message on the Consumer side, I am trying to deserialize with the KafkaProtobufSerializer. All the configuration properties are specified via yml file using the functional programming model in Spring Cloud stream. I have also specified the property “specific.protobuf.value.type” to help the consumer convert the Dynamic Message which is coming in. However, even after trying a lot of things, the code generates an error. The main error is "Invalid value XXX.ClassName for configuration specific.protobuf.value.type: com.bugni.demo.screening.ScreeningResponseImpl.ScreeningResponse could not be found.
This class ScreeningResponse is present under a generated class ScreeningResponseImpl under the src/java/main/…/ directory. It has been generated via the plugin: org.xolstice.maven.plugins
protobuf-maven-plugin
I can use the generated class under the main source code to specify it as a Type in the @Bean Function which acts as the message handler. But, at runtime, the above error is occurring and blocking further progress. Finding documentation for Spring Cloud Stream Functional model-based usage for Confluent has been tremendously hard. I would really appreciate any help on this.
Following yaml file is used for the spring boot service which has the consumer:
spring:
cloud:
stream:
function:
definition: processResult
bindings:
processResult-in-0:
destination: response-topic
group: response_consumer
consumer:
useNativeDecoding: true
kafka:
bindings:
processResult-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
specific.protobuf.value.type: com.bugni.demo.screening.ScreeningResponseImpl.ScreeningResponse
schema.registry.url: http://localhost:8081
Consumer message handler bean is like this:-
@Bean
public Consumer processResult() {
The producer of ScreeningResponse is like this ():-
@Bean
public Function<ApplicationRequest, ScreeningResponse> processPepCheck() {
return application → {
System.out.println("XYZ processor received : " + application);
ScreeningResponse response = ScreeningResponse.newBuilder()
.setApplicationId(application.getId())
.setResponseStatus(ScreeningResponseImpl.ResponseStatus.ACCEPT)
.setScreeningType(ScreeningResponseImpl.ScreeningType.XYZ)
.setResponseMetadata(“Successful response”).build();
System.out.println("Screening response is : " + response);
return response;
};
The pom has the following main/relevant dependencies:-
org.springframework.cloud
spring-cloud-stream
org.springframework.cloud
spring-cloud-stream-binder-kafka
org.springframework.kafka
spring-kafka
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.3.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.12</version>
</dependency>
I have Confluent installed locally to run this. The message is getting published in the response-topic successfully ( although it does show content-type as ‘application/json’ in the Confluent Control Center messages tab)