Multiple Event types in same topic

Hi,

I am trying to refer this blog and create a multi type schema using the same topic. https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/

Below is the schema which I have created using maven avro plugin, mvn schema-registry:register.

After registering I get the console log as :

Registered subject(emp-address-updated) with id 867 version 1
[INFO] Registered subject(emp-created) with id 868 version 1
[INFO] Registered subject(emp-department-changed) with id 869 version 1
[INFO] Registered subject(emp-types-value) with id 870 version 1

And the schema in the confluent control center looks like :

{
   "fields": [
     {
       "name": "employeeType",
       "type": [
         "EmpCreated",
         "EmpDepartmentChanged",
         "EmpAddressUpdated"
       ]
     }
   ],
   "name": "EmpTypes",
   "namespace": "com.domain.messaging.resource",
   "type": "record"
 }

When I am trying to produce an event using the below producer properties of type EmpCreated, I am getting the error mentioned below:

propeties:

key.serializer: org.apache.kafka.common.serialization.IntegerSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: <schema-registry-url>
auto.register.schemas: false
use.latest.version: true
value.schema.id: 870

error:

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230) ~[kafka-schema-registry-client-5.3.0.jar:na]
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) ~[kafka-schema-registry-client-5.3.0.jar:na]
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:323) ~[kafka-schema-registry-client-5.3.0.jar:na]
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:311) ~[kafka-schema-registry-client-5.3.0.jar:na]
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:191) ~[kafka-schema-registry-client-5.3.0.jar:na]
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:323) ~[kafka-schema-registry-client-5.3.0.jar:na]
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82) ~[kafka-avro-serializer-4.0.0.jar:na]
	at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-4.0.0.jar:na]
	at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
	at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar:na]
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE]
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE]
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:218) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEAS

What am I missing here, could you please suggest?

It depends on how you registered the schema’s. But since you want to store multiple in the same topic, it should be recordname, or topicrecordname like in bkes-demo/clients.clj at 7d86b9b2ef2b014c0ef85b045de8559013d9e9ee · gklijs/bkes-demo · GitHub.

Hi,

I have used the maven schema: register to register the schema with references. Followed exactly what is mentioned in the blog below, section : “Getting started with schema references”.

As per this, the default TopicNameStrategy should work. My Subject name is emp-types-value and the topic name is emp-types.
I am still facing the same issue of “Schema not found; error code: 40403”. Please let me know your thoughts here.

how is your pom file configured? Are you using spring boot?

Please ignore my previous email. The plugin looks something like this. Yes, I am using a using boot app.

have you tried writing the avro union EmpTypes.avsc like this:

[
"com.domain.messaging.resource.EmpCreated",
"com.domain.messaging.resource.EmpDepartmentChanged",
...
]

I solved by splitting the definition of the avro schemas and avro union in two different folders.

Tried this approach too. I still get the same error.
Is it anything related to value.schema.id?

Have a look at this project GitHub - gpietro/spring-boot-kafka-demo. Here I do exactly what your are asking.

Do you register the union in the schema registry?