How to save Kafka Headers values in MongoDB Sink?

Right now I have a MongoDB Sink connector. My incoming Kafka messages contain Auditlogs in the Kafka headers. The Kafka headers values are byte arrays and the Kafka message values are Avro. How can I best save the Header values (decoded to JSON) to my MongoDB sink?

Have you tried to use a transform?

@OneCricketeer Yeah, sadly there is no transform for my use case that I know of. Headerfrom adds headers from values and I need it the other way around.

What about this one?

https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/HeaderToField.html

@OneCricketeer Yes, I have used this. only problem is that it is saving the values as Base64 in the mongo sink. How can I save it as JSON?

There’s a BytesToString transform in the same project.

Otherwise, you should be able to decode your data at query time.

1 Like

@OneCricketeer Hmm I am going to try this.
During querying I dont think it will be very logical in case of filtering for example.
I was also thinking of using a custom post processor: https://www.mongodb.com/docs/kafka-connector/current/sink-connector/fundamentals/post-processors/ that casts bytes to json

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.