Hi,
in our application we use Spring Cloud Stream to produce records to Kafka in Avro format. We used the annotation based principle, which is deprecated since version 3.1 in favor of the functional programming model. The data source to the producer is a REST endpoint, so according to the documentation, using StreamBridge is recommended for producing messages. The way of sending messages looks like this:
...
@Autowired
private lateinit var streamBridge: StreamBridge
fun <T> send(
topicBindingName: String,
data: T,
messageKey: String? = null,
) {
val messageBuilder = MessageBuilder.withPayload(data)
messageKey?.let { messageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY, it.toByteArray()) }
streamBridge.send(
topicBindingName,
messageBuilder.build(),
MimeType("application", "+avro"),
)
}
StreamBridge will try to find a suitable converter that can convert the message to Avro format, so it will find AvroSchemaRegistryClientMessageConverter, which will convert the message to Apache Avro format, which does not consider the Magic byte and schema id at the beginning of the message. As a workaround I wrote my own converter for Confluent Avro format.
The question is, how should I use Spring Cloud StreamBuilder using Confluent Avro format as output type? Alternatively, is there another way to implement Spring Cloud Stream producer with functional programming model using Confluent’s KafkaAvroSerializer?
Thanks.