Multiple Event types in same topic with unions

I read the article with interest

[https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/]

I am trying to the set up a topic to handle multiple types safely as suggested using schema references.

I was able to register the two schemas customer and product successfully and verified that the subject and schema are created.

shell>*curl -X GET http://localhost:8081/subjects/customer/versions/1*

{"subject":"customer","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Customer\",\"namespace\":\"io.confluent.examples.avro\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"customer_name\",\"type\":\"string\"},{\"name\":\"customer_email\",\"type\":\"string\"},{\"name\":\"customer_address\",\"type\":\"string\"}]}"}

shell>*curl -X GET http://localhost:8081/subjects/product/versions/1*

{"subject":"product","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Product\",\"namespace\":\"io.confluent.examples.avro\",\"fields\":[{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"product_name\",\"type\":\"string\"},{\"name\":\"product_price\",\"type\":\"double\"}]}"}

Then I created a new topic all_types into Control Center then when I try to assign a schema to the value of topic (all_types) using the above references, I am getting the following error.

Invalid schema [ { "name": "io.confluent.examples.avro.Customer", "subject": "customer", "version": 1 }, { "name": "io.confluent.examples.avro.Product", "subject": "product", "version": 1 } ] with refs [] of type AVRO

Any idea what I am doing wrong ?
I tried with confluent 5.5.3 (and 6,1,1). Both fail with similar error.

I am following the blog article closely, but I expect that a references list may be missing in the schema add. However I am not sure how to specify the reference list to the union in the topic schema.

Thanks,

Hi,

Control Center does not yet support schema references. To register a schema with references, you can use the Schema Registry API Reference | Confluent Documentation or the Schema Registry Maven Plugin | Confluent Documentation. Also, one of the community members contributed a set of examples here: GitHub - gklijs/schema_registry_test_app: Java Application that consumes different type sof messages supported by Schema Registry, and produces a similar kind to the same topic.

2 Likes

Thank you so much for a prompt response.

I had used the API (via a python script) to register the subject and the initial reference schemas (directly from the avsc file) and that had worked fine.

But switched to the console for the topic schema, which did not work as described above.

We will try it soon and get back.

Thanks.

A post was split to a new topic: Multiple Event types in same topic

.
@ryokota

As suggested, I tried to register now the topic schema (as a union of references) directly using the API.

curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json” --data ‘{ “schema”: “[ “io.confluent.examples.avro.Customer”, “io.confluent.examples.avro.Product”]” }’ http://localhost:8081/subjects/all-types-value/versions

I get same error as before:

{“error_code”:42201,“message”:“Either the input schema or one its references is invalid”}

Anything wrong with my API usage ?

Just for reference here are the subjects, successfully registered.

curl -X GET http://localhost:8081/subjects/product/versions/1

{“subject”:“product”,“version”:1,“id”:2,“schema”:"{“type”:“record”,“name”:“Product”,“namespace”:“io.confluent.examples.avro”,“fields”:[{“name”:“product_id”,“type”:“int”},{“name”:“product_name”,“type”:“string”},{“name”:“product_price”,“type”:“double”}]}"}

curl -X GET http://localhost:8081/subjects/customer/versions/1

{“subject”:“customer”,“version”:1,“id”:1,“schema”:"{“type”:“record”,“name”:“Customer”,“namespace”:“io.confluent.examples.avro”,“fields”:[{“name”:“customer_id”,“type”:“int”},{“name”:“customer_name”,“type”:“string”},{“name”:“customer_email”,“type”:“string”},{“name”:“customer_address”,“type”:“string”}]}"}

Correction to my previous response…

The way I registered the topic schema is actually this:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "[ { \"name\": \"io.confluent.examples.avro.Customer\", \"subject\": \"customer\", \"version\": 1 }, { \"name\": \"io.confluent.examples.avro.Product\", \"subject\": \"product\", \"version\": 1 }]" }' http://localhost:8081/subjects/all-types-value/versions

Same error as before.

You need to include an array of references with the POST body, for example see the “Example Request” here: Schema Registry API Reference | Confluent Documentation

Thank you for your suggestion. It worked.

Just for the record, this is what I did.

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"   --data @allTypes.json  http://localhost:8081/subjects/all-types-value/versions
{"id":4}

and the text file allTypes.json looks like this:

{ "schema": "[  \"io.confluent.examples.avro.Customer\", \"io.confluent.examples.avro.Product\" ]",
  "references" : [
	  {
	   "name" : "io.confluent.examples.avro.Customer",
	   "subject" : "customer",
	   "version" : 1
      },
	  {
   	   "name" : "io.confluent.examples.avro.Product",
   	   "subject" : "product",
   	   "version" : 1
	  } 
	]
}

Thank you for your help.
I will now start to code the Java part. I expect that in the consumer I will need to poll for a SpecificRecord and cast it appropriately to the actual type sent in event.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.