Json with schema registry

Hello,
I’m quite new to Kafka and after successfully managing some HTTP connectors without schema (“value.converter.schemas.enable”: “false”), I’m struggling to use schemas. :frowning:

I’ve added the schema (“schemaType”: “JSON”) of my JSON (where the producer is out of my control) to the schema registry and then modified the configuration of my connector so that it uses it:

"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "https://kafka-schemaregistry:8081/",

The problem is, I get the (in)famous “Unknown magic byte!”

ERROR Error encountered in task test-withschema-0. Executing stage ‘VALUE_CONVERTER’ with class ‘io.confluent.connect.json.JsonSchemaConverter’, where consumed record is {topic=‘mytopic’, partition=0, offset=34, timestamp=1708945734273, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Converting byte to Kafka Connect data failed due to serialization error of topic visitor:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:236)

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I’ve read plenty of posts (including the great https://www.confluent.io/blog/how-to-fix-unknown-magic-byte-errors-in-apache-kafka/ ) but still struggling to fix it.

I’d have a couple of questions:

  1. I manipulate the JSON in my Kafka topic in order to match what I expect at the other end of the connector via some transformations. The schema should validate the resulting JSON (after the transformations), is that correct? Or should validate the initial JSON (before the transformations)?
  2. I currently have only schema in my schema registry, but do I need to somehow tell me connector which schema it should use? If so, how do I do that?

Thanks for any help which can help me better troubleshoot my issue :slight_smile:

You need to use the KafkaJsonSchemaSerializer when producing to Kafka.

Battling…
build up my schema registry client and serializer as follows.

		// Create a new Schema Registry client
		client, err = schemaregistry.NewClient(schemaregistry.NewConfig(vKafka.SchemaRegistryURL))
		serdes := jsonschema.NewSerializerConfig()
		serdes.AutoRegisterSchemas = false
		serdes.EnableValidation = true
		serializer, err = jsonschema.NewSerializer(client, 2, serdes)
		valueBytes, err := serializer.Serialize(vKafka.BasketTopicname, &t_SalesBasket)

pushing a payload - See example below

Schema Registry entry - See example below

and getting error.

2024/06/20 08:36:10 ERROR: SalesBasket: Failed to Serialize, error: schema registry request failed error code: 40403: Schema not found

Example Payload

{
 	"InvoiceNumber": "1341243123341232",
	"SaleDateTime": "2024-06-11T16:53:39.911+02:00",
	"SaleTimetamp": "1718117619911",
	"Store" : {
		"Id": "2143412",
		"Name": "sdfgsjdjndnjdfgs"
		},
	"Clerk": {
		"Id": "231",
		"Name": "grfvnowifgbvuwe"
		},
	"TerminalPoint": "124",
	"BasketItems":[
		{
			"Id": "234123412",
			"Name": "",
			"Brand": "fgtwruyergfd",
			"Category": "",
			"Price":12412.00,
			"Quantity":3
		},
		{
			"Id": "234123421",
			"Name": "fgtwergo678d",
			"Brand": "",
			"Category": "",
			"Price":12.00,
			"Quantity":3
		},
		{
			"Id": "534123412",
			"Name": "fadsagergfd",
			"Brand": "",
			"Category": "",
			"Price":112.00,
			"Quantity":2
		},
		{
			"Id": "224123412",
			"Name": "fgtwerg5fd",
			"Brand": "",
			"Category": "",
			"Price":22.00,
			"Quantity":4
		},
		{
			"Id": "234123412",
			"Name": "fgtwe1ergfd",
			"Brand": "",
			"Category": "",
			"Price":4.99,
			"Quantity":1
		}
	],
	"Net": 442.23,
	"VAT":10.00,
	"Total":452.23 
}

Schema entered into cc

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "properties": {
      "BasketItems": {
        "items": [
          {
            "properties": {
              "Brand": {
                "type": "string"
              },
              "Category": {
                "type": "string"
              },
              "Id": {
                "type": "string"
              },
              "Name": {
                "type": "string"
              },
              "Price": {
                "type": "number"
              },
              "Quantity": {
                "type": "integer"
              }
            },
            "required": [
              "Id",
              "Name",
              "Brand",
              "Category",
              "Price",
              "Quantity"
            ],
            "type": "object"
          }
        ],
        "type": "array"
      },
      "Clerk": {
        "properties": {
          "Id": {
            "type": "string"
          },
          "Name": {
            "type": "string"
          }
        },
        "required": [
          "Id",
          "Name"
        ],
        "type": "object"
      },
      "InvoiceNumber": {
        "type": "string"
      },
      "Net": {
        "type": "number"
      },
      "SaleDateTime": {
        "type": "string"
      },
      "SaleTimetamp": {
        "type": "string"
      },
      "Store": {
        "properties": {
          "Id": {
            "type": "string"
          },
          "Name": {
            "type": "string"
          }
        },
        "required": [
          "Id",
          "Name"
        ],
        "type": "object"
      },
      "TerminalPoint": {
        "type": "string"
      },
      "Total": {
        "type": "number"
      },
      "VAT": {
        "type": "number"
      }
    },
    "required": [
      "InvoiceNumber",
      "SaleDateTime",
      "SaleTimetamp",
      "Store",
      "Clerk",
      "TerminalPoint",
      "BasketItems",
      "Net",
      "VAT",
      "Total"
    ],
    "type": "object"
  }

executing the below curl

curl --silent -X GET http://localhost:8081/schemas |grep JSON |grep salesbasket | jq .

returns: (which matches my cc interface on the schema)

{
    "subject": "json_salesbaskets-value",
    "version": 10,
    "id": 23,
    "schemaType": "JSON",
    "schema": "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"type\":\"object\",\"properties\":{\"InvoiceNumber\":{\"type\":\"string\"},\"SaleDateTime\":{\"type\":\"string\"},\"SaleTimetamp\":{\"type\":\"string\"},\"Store\":{\"type\":\"object\",\"properties\":{\"Id\":{\"type\":\"string\"},\"Name\":{\"type\":\"string\"}},\"required\":[\"Id\",\"Name\"]},\"Clerk\":{\"type\":\"object\",\"properties\":{\"Id\":{\"type\":\"string\"},\"Name\":{\"type\":\"string\"}},\"required\":[\"Id\",\"Name\"]},\"TerminalPoint\":{\"type\":\"string\"},\"BasketItems\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"Id\":{\"type\":\"string\"},\"Name\":{\"type\":\"string\"},\"Brand\":{\"type\":\"string\"},\"Category\":{\"type\":\"string\"},\"Price\":{\"type\":\"number\"},\"Quantity\":{\"type\":\"integer\"}},\"required\":[\"Id\",\"Name\",\"Brand\",\"Category\",\"Price\",\"Quantity\"]}]},\"Net\":{\"type\":\"number\"},\"VAT\":{\"type\":\"number\"},\"Total\":{\"type\":\"number\"}},\"required\":[\"InvoiceNumber\",\"SaleDateTime\",\"SaleTimetamp\",\"Store\",\"Clerk\",\"TerminalPoint\",\"BasketItems\",\"Net\",\"VAT\",\"Total\"]}"
  },

Needed serdes.UseLatestVersion = true

serdes.AutoRegisterSchemas = false
serdes.EnableValidation = true
serdes.UseLatestVersion = true
serializer, err = jsonschema.NewSerializer(client, serde.ValueSerde, serdes)

so trying to behave, follow communicated best practices!!!

… so with above, using jsonschema.serialize client and check I can do 500messages/sec.
– using the confluent libraries

with schema validation removed… and using straight json.marshal(t_basket) i can do 10000/sec.

thats one serious huge difference.
PS: doing same process using protobuf I get to about 9000/second.

@rmoff , have you seen this before, this large?

G