Schema validation - can't produce a record when validation is switched on: record rejected by the record interceptor

Hi,
I’m working through producing avro messages and applying message validation - against schema - at message produce time. But I am seeing an error i don’t really understand.

I’m doing the following.

  1. Run Confluent locally.
  2. Create a topic called “flow”.
  3. Publish a message to that topic, using kafka-avro-console-producer, string serializer for the key and a simple avro schema with 2 fields for the value, (schema below) e.g. abc:{“first”:3, “second”: “foo”}

{ “type”: “record”, “name”: “demoRecord”,
“fields”: [
{ “name”: “first”, “type”: “int” },
{ “name”: “second”, “type”: “string” }
]
}

  1. confirm schema has been auto-registered in schema registry following message being produced, e.g.

curl --silent -X GET http://localhost:8081/subjects | jq
[
“flow-value”
]

  1. “Turn on” schema/message validation, e.g.

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name flow --add-config confluent.value.schema.validation=true
Completed updating config for topic flow.

  1. Try and publish the same message again, again using an avro console producer. Kafka now errors, with output:

[2023-05-10 22:07:42,769] ERROR Error when sending message to topic flow with key: 3 bytes, value: 10 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52)
org.apache.kafka.common.InvalidRecordException: Log record DefaultRecord(offset=0, timestamp=1683752861715, key=3 bytes, value=10 bytes) is rejected by the record interceptor io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator

Is the output telling me the message isn’t valid against the registered schema subject “flow-value”?

Problem turned out to be simple, just needed more info. Consider, when copying and pasting, whether examples in docs are appropriate for your setup. I blindly/ mistakenly copied and pasted the example from Broker-side Schema ID Validation | Confluent Documentation,

confluent.schema.registry.url=http://schema-registry:8081

…needed to use localhost.

Checking the schema registry logs helps pin down the problem,

[2023-06-22 16:01:46,570] INFO RecordSchemaValidatorConfig values: 
        confluent.basic.auth.credentials.source = 
        confluent.basic.auth.user.info = [hidden]
        confluent.bearer.auth.credentials.source = 
        confluent.bearer.auth.token = [hidden]
        confluent.key.schema.validation = false
        confluent.key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        confluent.missing.id.cache.ttl.sec = 60
        confluent.missing.id.query.range = 200
        confluent.missing.schema.cache.ttl.sec = 60
        confluent.schema.registry.max.cache.size = 10000
        confluent.schema.registry.max.retries = 1
        confluent.schema.registry.retries.wait.ms = 0
        **confluent.schema.registry.url = [http://schema-registry:8081]**

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.