Hi,
I am trying to refer this blog and create a multi type schema using the same topic. Multiple Event Types in the Same Kafka Topic - Revisited
Below is the schema which I have created using maven avro plugin, mvn schema-registry:register.
After registering I get the console log as :
Registered subject(emp-address-updated) with id 867 version 1
[INFO] Registered subject(emp-created) with id 868 version 1
[INFO] Registered subject(emp-department-changed) with id 869 version 1
[INFO] Registered subject(emp-types-value) with id 870 version 1
And the schema in the confluent control center looks like :
{
"fields": [
{
"name": "employeeType",
"type": [
"EmpCreated",
"EmpDepartmentChanged",
"EmpAddressUpdated"
]
}
],
"name": "EmpTypes",
"namespace": "com.domain.messaging.resource",
"type": "record"
}
When I am trying to produce an event using the below producer properties of type EmpCreated, I am getting the error mentioned below:
propeties:
key.serializer: org.apache.kafka.common.serialization.IntegerSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: <schema-registry-url>
auto.register.schemas: false
use.latest.version: true
value.schema.id: 870
error:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230) ~[kafka-schema-registry-client-5.3.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) ~[kafka-schema-registry-client-5.3.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:323) ~[kafka-schema-registry-client-5.3.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:311) ~[kafka-schema-registry-client-5.3.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:191) ~[kafka-schema-registry-client-5.3.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:323) ~[kafka-schema-registry-client-5.3.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82) ~[kafka-avro-serializer-4.0.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-4.0.0.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:218) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEAS
What am I missing here, could you please suggest?