Json with schema registry

Hello,
I’m quite new to Kafka and after successfully managing some HTTP connectors without schema (“value.converter.schemas.enable”: “false”), I’m struggling to use schemas. :frowning:

I’ve added the schema (“schemaType”: “JSON”) of my JSON (where the producer is out of my control) to the schema registry and then modified the configuration of my connector so that it uses it:

"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "https://kafka-schemaregistry:8081/",

The problem is, I get the (in)famous “Unknown magic byte!”

ERROR Error encountered in task test-withschema-0. Executing stage ‘VALUE_CONVERTER’ with class ‘io.confluent.connect.json.JsonSchemaConverter’, where consumed record is {topic=‘mytopic’, partition=0, offset=34, timestamp=1708945734273, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Converting byte to Kafka Connect data failed due to serialization error of topic visitor:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:236)

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I’ve read plenty of posts (including the great https://www.confluent.io/blog/how-to-fix-unknown-magic-byte-errors-in-apache-kafka/ ) but still struggling to fix it.

I’d have a couple of questions:

  1. I manipulate the JSON in my Kafka topic in order to match what I expect at the other end of the connector via some transformations. The schema should validate the resulting JSON (after the transformations), is that correct? Or should validate the initial JSON (before the transformations)?
  2. I currently have only schema in my schema registry, but do I need to somehow tell me connector which schema it should use? If so, how do I do that?

Thanks for any help which can help me better troubleshoot my issue :slight_smile:

You need to use the KafkaJsonSchemaSerializer when producing to Kafka.