Spring Cloud StreamBridge Confluent Avro Converter

Hi,

in our application we use Spring Cloud Stream to produce records to Kafka in Avro format. We used the annotation based principle, which is deprecated since version 3.1 in favor of the functional programming model. The data source to the producer is a REST endpoint, so according to the documentation, using StreamBridge is recommended for producing messages. The way of sending messages looks like this:

...
@Autowired
private lateinit var streamBridge: StreamBridge

fun <T> send(
  topicBindingName: String,
  data: T,
  messageKey: String? = null,
) {
    val messageBuilder = MessageBuilder.withPayload(data)
    messageKey?.let { messageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY, it.toByteArray()) }
    streamBridge.send(
      topicBindingName,
      messageBuilder.build(),
      MimeType("application", "+avro"),
    )
}

StreamBridge will try to find a suitable converter that can convert the message to Avro format, so it will find AvroSchemaRegistryClientMessageConverter, which will convert the message to Apache Avro format, which does not consider the Magic byte and schema id at the beginning of the message. As a workaround I wrote my own converter for Confluent Avro format.

The question is, how should I use Spring Cloud StreamBuilder using Confluent Avro format as output type? Alternatively, is there another way to implement Spring Cloud Stream producer with functional programming model using Confluent’s KafkaAvroSerializer?

Thanks.

I haven’t used Spring Cloud in a while, and I did’t even know there was something like a StreamBridge, but I think you would be able to just configure the correct Serializer in the application properties.
I did encounter previously is was hard to find a working example, and maybe the config I used might help you, kafka-graphql-examples/application.yml at 307bbad6f10e4aaa6b797a3bbe3b6620d3635263 · openweb-nl/kafka-graphql-examples · GitHub.

I’m afraid it’s not in the configuration, as the AbstractAvroMessageConverter already has a serializer and deserializer (convertFromInternal and convertToInternal methods) implemented, which also accept other parameters and work a little differently than a regular serializer and deserializer. I don’t think it will be possible to use Spring’s StreamBridge until there is no Confluent’s MessageConverter that handles Avro messages or you can set up your own serializers and deserializers. For producing Avro messages I used KafkaTemplate where it works without Spring Cloud …