Kafka Connect, RabbitMQ, and JSON Schema

@rmoff I also need to read data from RabbitMQ (I don’t have a choice in this) and inject into Kafka (we are on Confluent Cloud). Data coming from RabbitMQ is json. I am using RabbitMQ Source Connector - I can see data with a bytearrayconverter but our setup will require schema so I am trying to use JsonSchemaConverter. However if I do value.converter= io.confluent.connect.json.JsonSchemaConverter

I get the error

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration value.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

What am I doing wrong? Thanks for your help!

You need to set the value.converter to match the serialisation of the source data — not the serialisation that you would like to use (See this blog for a further explanation).

Therefore you are correct to use org.apache.kafka.connect.converters.ByteArrayConverter. If you want to use a serialisation type with support for schemas (which is a good thing to be doing, for sure) you will need a stream processor to apply the schema to the data. One option is to use ksqlDB as shown in this blog.

What version of Kafka Connect are you running? Support for JSON Schema was added in Confluent Platform 5.5.

Thanks for pointing out. We are using 5.4.1.

My json has 4-level of nesting - would JsonSchemaConverter be able to handle that?

I was trying to use FromJson transform from GitHub - jcustenborder/kafka-connect-json-schema but it fails to start for this 4-level nested schema.

Is there any limitation on how nested can the json be in this FromJson transform?

And does that limitation get removed if I upgrade to 5.5 and use JsonSchemaConverter?

Or, there is no known limitation on level of nesting in the json in either of them and I am having the problem because the schema spec has some error?

Thanks for your help!

got FromJson transform to work after fixing schema spec

1 Like