KafkaJsonSchemaSerializer unable to retrieve schema from Schema Registry

I have setup a free account in Confluent Cloud to try out Schema Registry. I created the cluster and topics as mentioned in SCHEMA-REGISTRY-101 tutorial. I played around with Avro and ProtoBuf schema as per the tutorial and everything worked well. I was able to produce as well as consume the events from both topics (avro as well as protobuf).

Next I created a new topic to try out JSON Schema. I registered the JSON Schema using Gradle registerSchemasTask. I verified that schema was registered using Confluent Cloud UI. This is how the schema looks like on Confluent Cloud

{
  "$id": "purchase",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "properties": {
    "customer_id": {
      "type": "string"
    },
    "desc": {
      "type": "string"
    },
    "employee_id": {
      "default": "unknown",
      "type": "string"
    },
    "item": {
      "type": "string"
    },
    "total_cost": {
      "type": "number"
    }
  },
  "type": "object"
}

When I try to produce an event to this json topic I get the following exception. The exception.

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error retrieving JSON schema: {"$schema":"http://json-schema.org/draft-07/schema#","title":"Purchase With Json Schema","type":"object","additionalProperties":false,"properties":{"item":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"totalCost":{"type":"number"},"customerId":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"employeeId":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"desc":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]}},"required":["totalCost"]}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
	at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:141)
	at io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:79)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:945)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:905)
	at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.lambda$producePurchaseEvents$2(ProducerWithJsonSchemaSerializerApp.java:51)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.producePurchaseEvents(ProducerWithJsonSchemaSerializerApp.java:51)
	at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.main(ProducerWithJsonSchemaSerializerApp.java:83)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:458)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:434)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:316)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:539)
	at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:118)
	... 8 more

I have searched online and unable to figure out what is wrong here. Appreciate your help here.

Notice your error includes a field called totalCost, for example, which does not match total_cost… It seems that the library you are using to do the object conversion has generated a completely different schema, which could not be registered

1 Like

Thanks for the response. I matched the fields in my producer event with the name in schema but I still get the same issue. I changed the totalCost to be String so its optional by default.

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error retrieving JSON schema: {"$schema":"http://json-schema.org/draft-07/schema#","title":"Purchase With Json Schema","type":"object","additionalProperties":false,"properties":{"item":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"totalCost":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"customerId":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"employeeId":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]}}}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
	at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:141)
	at io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:79)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:945)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:905)
	at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.lambda$producePurchaseEvents$2(ProducerWithJsonSchemaSerializerApp.java:51)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.producePurchaseEvents(ProducerWithJsonSchemaSerializerApp.java:51)
	at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.main(ProducerWithJsonSchemaSerializerApp.java:83)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

And here is my latest schema in Confluent CLoud

{
  "$id": "purchase",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "properties": {
    "customerId": {
      "type": "string"
    },
    "employeeId": {
      "type": "string"
    },
    "item": {
      "type": "string"
    },
    "totalCost": {
      "type": "string"
    }
  },
  "type": "object"
}