I have setup a free account in Confluent Cloud to try out Schema Registry. I created the cluster and topics as mentioned in SCHEMA-REGISTRY-101 tutorial. I played around with Avro and ProtoBuf schema as per the tutorial and everything worked well. I was able to produce as well as consume the events from both topics (avro as well as protobuf).
Next I created a new topic to try out JSON Schema. I registered the JSON Schema using Gradle registerSchemasTask. I verified that schema was registered using Confluent Cloud UI. This is how the schema looks like on Confluent Cloud
{
"$id": "purchase",
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"customer_id": {
"type": "string"
},
"desc": {
"type": "string"
},
"employee_id": {
"default": "unknown",
"type": "string"
},
"item": {
"type": "string"
},
"total_cost": {
"type": "number"
}
},
"type": "object"
}
When I try to produce an event to this json topic I get the following exception. The exception.
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error retrieving JSON schema: {"$schema":"http://json-schema.org/draft-07/schema#","title":"Purchase With Json Schema","type":"object","additionalProperties":false,"properties":{"item":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"totalCost":{"type":"number"},"customerId":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"employeeId":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"desc":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]}},"required":["totalCost"]}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:141)
at io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:79)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:945)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:905)
at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.lambda$producePurchaseEvents$2(ProducerWithJsonSchemaSerializerApp.java:51)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.producePurchaseEvents(ProducerWithJsonSchemaSerializerApp.java:51)
at io.confluent.developer.ProducerWithJsonSchemaSerializerApp.main(ProducerWithJsonSchemaSerializerApp.java:83)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:458)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:434)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:316)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:539)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:118)
... 8 more
I have searched online and unable to figure out what is wrong here. Appreciate your help here.