Ksqldb table key and value with avro schema serialization error

Hi,
I created a topic and register Avro schema for the Key and Value. They are both objects. I also verified the schema is registed in the schema registery. Then create a source table in the Ksqldb with below query to pull the data in the .Net. The reason I have to specify the schema id is because I need the property name stay CamelCase as it is defined in the topic.
CREATE source TABLE ExampleTopic WITH (KAFKA_TOPIC=‘ExampleTopic’, KEY_FORMAT=‘Avro’, VALUE_FORMAT=‘Avro’, KEY_SCHEMA_ID=‘1’,VALUE_SCHEMA_ID=‘2’);
Afetr I push the message to the topic, I get below serialization error when I try
select * from ExampleTopic;
It all works fine if I don’t specify the KEY_SCHEMA_ID and VALUE_SCHEMA_ID. However, all the table property name will be upper case. I need to keep them CamelCase. I appriciate any help. Thank you.

Here is the error:
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:127)
at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:152)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic _confluent-ksql-ksqldb-server_transient_transient_ExampleTopic_3196606156607934861_1741290716040-KsqlTopic-Reduce-changelog :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53)
… 32 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving schema ID{“type”:“record”,“name”:“XXX”,“namespace”:“XXX”,“fields”:[{“name”:“XXX”,“type”:“string”},{“name”:“XXX”,“type”:“string”}]}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:168)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
… 33 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject ‘_confluent-ksql-ksqldb-server_transient_transient_ExampleTopic_3196606156607934861_1741290716040-KsqlTopic-Reduce-changelog-key’ not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:302)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:372)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:463)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:448)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:350)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:565)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:545)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:130)
… 35 more