Transform json message based on message key

Hi, I have a topic that can receive json messages of different formats. The message key is used to identify the type of record. Is there a way to apply a message specific schema to each incoming record based on the key?

If you do it “manually” yes. Also depends how many different keys you have – if it’s only a few keys, KStream#split() could help.

For full flexibility, you would read the value as byte[] first, and apply a transform() to apply the schema based on the key to deserialize the data.

Thank you so much. Will try this out.

There is only one key for each message. So based on the key, the json transformation has to be applied for each kafka message payload (value)

Hello, Here is my use case. I have heterogenous json messages flowing into 1 topic. Based on the message key, I have to apply a transformation to the message and that message has to be written to a specific table in mysql. Each message type has it’s own table.

Now the transformation I am able to do in Kafka Streams. How can I write to mysql from streams. I don’t want to publish this transformed message again to another topic.

It’s highly recommended to write the data back into a topic, and use Connect to push the data to MySQL.

If you want to push directly, you could use a process() call, and use a MySQL client within it. But you need to take care of a lot of complex error handling yourself for this case (that’s why it is not recommended).

Yes, that is what I am doing. In a kstream, looping thru each message and processing it using jackson for processing the json and then publishing back to a topic which is picked up by a jdbc sink connector to post to MySQL. thank you for your suggestions.

1 Like