Routing messages with different types/schemas

I just started using KSQLDB and am pretty impressed with it. But I have found an issue that I would think be possible, but it appears it isn’t, or I am thinking about it the wrong way. I have a topic that has different messages coming into it and I want to route them to specific topics based on their type.

All messages are JSON and follow the same high level format, similar to an envelope concept. Top level fields are meta data and one field contains the payload. The payload is different for each record. The schema is the same until we get into the Payload. At this point I don’t care about the payload and I would like to ignore the Payload for the “routing” stream.

example fields:
Payload: JSON,
LastModifiedDate: VARCHAR,
Key: VARCHAR,
type: VARCHAR,
Name: VARCHAR,
CreatedDat:e VARCHAR,
Id: VARCHAR,
Status: VARCHAR

see this article for a good explanation on why you might do this: Should You Put Several Event Types in the Same Kafka Topic? | Confluent

I found a couple examples that didn’t work. One solution would be to mark the attribute as varchar. But that changes the content of the message to a string and thus escapes all the double quotes. This would not work for the consumers.

This article shows a similar solution, but it only works by explicitly extracting individual fields. Working with heterogenous JSON records using ksqlDB

How would others approach the problem with KSQLDB? My alternative would be to create a producer/consumer service that manually does this. We have KSQLD setup and it seems like a better solution if I can make it work. Also related to the problem how do people handle rapid green field development were the schema is in flux early on?

Thanks

Hi @tparker, you can definitely use ksqlDB to do this.
I wrote about it here and here (section called “Splitting one stream into many”).

You can see slides & a talk describing it here. Let me know if you have any questions.

Hi Robin,
Thanks for the advice. There are great resources. I found a few others but did not see these. I will read these more in depth to gain as much info as I can.

I did look at each of them and was not able to get what I need. Previously I was able to encode a the payload as a varchar which basically turns the sub JSON payload into a string with escaped quotes. Your examples do something similar. It would still require knowing the schema. I just punt it down the line a bit. But I guess the schema at this point should be known at the topic I route it too. I still would like to be schema agnostic as kafka is used simply as a passthrough.

Another example uses the struct data type. I saw that in another blog post but it doesn’t seem to be supported and not in the docs. You apparently need to supply a schema for it struct<field1 varchar, field2 int, etc>. This looks promising if it is possible.

The video is cool, but I couldn’t get it to work correctly but I will continue with it. It looks like it is extracting specific data out of an array of json. I appears the extracted data would still be escapped strings. Maybe there is something in the conversion to avro that handles that. I will try and run and deconstruct your full example to see how it is works and if it will help.

Again, thanks for all your help. Your blogs have been very useful.

This topic was automatically closed after 30 days. New replies are no longer allowed.