Proper way to use Schema Registry with StreamJoins

Hello, I am currently trying to set up a project with using Protobufs for serde instead of Json. It is a Spring Cloud Kafka Streams project, and for functions that don’t use joins everything is working with the new Protobuf classes. My current stream join looks like:

firstStream.join(
    secondStream,
    myJoiner,
    JoinWindows.of(Duration.ofMinutes(2)),
    StreamJoined.with(Serdes.String(), JsonSerde(FirstResult::class.java),
            JsonSerde(SecondResult::class.java)),
)

and I have updated it to this:

firstStream.join(
    secondStream,
    myJoiner,
    JoinWindows.of(Duration.ofMinutes(2)),
    StreamJoined.with(Serdes.String(), KafkaProtobufSerde(FirstResultProto::class.java),
            KafkaProtobufSerde(SecondResultProto::class.java)),
)

but I get the following exception:

Exception in thread "joinComplete-applicationId-9bf24992-2a50-46e4-8dd7-de9b4d70569f-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=first-run-complete, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.InvalidConfigurationException: SchemaRegistryClient not found. You need to configure the serializer or use serializer constructor with SchemaRegistryClient.

	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1193)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: SchemaRegistryClient not found. You need to configure the serializer or use serializer constructor with SchemaRegistryClient.

If someone could help me figure out the proper way to set up Schema Registry for stream joins that would be great.

Thanks

Figured it out almost as soon as I posted. I had to use the KafkaProtobufSerde Beans I registered and not create new ones. All working now!

1 Like