Is rejected by the record interceptor io.confluent.cloud.kafka.schemaregistry.validator.CloudRecordSchemaValidator

I am using java based producer to publish events to Confluent, but I am consistently encountering the following exception is rejected by the record interceptor io.confluent.cloud.kafka.schemaregistry.validator.CloudRecordSchemaValidator

There are no helpful pointers in the log, making it difficult to understand what’s going wrong, hence posting here to get experts advise.

TopicStrategy: TopicRecordNameStrategy
Schema: JSON Schema validation enabled
Java code: (For security reasons, I have obfuscated the keys/secrets in the below code)

package com.shabbir.kafka.schemaregistry.clients.jsonschema.conflluent;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;

import static java.lang.String.format;

import com.shabbir.model.Employee;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig;
import lombok.SneakyThrows;

public class EventResponseProducer
{

    private static final String CONFLUENT_SERVER_URL = "XXXX";
    private static final String CLUSTER_API_KEY = "XXXX";
    private static final String CLUSTER_API_SECRET = "XXXX";
    private static final String SCHEMA_REGISTRY_URL = "XXXX";
    private static final String SCHEMA_REGISTRY_API_KEY = "XXXX";
    private static final String SCHEMA_REGISTRY_API_SECRET = "XXXX";

    @SneakyThrows
    public static void main(String[] args)
    {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CONFLUENT_SERVER_URL);
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,
            format("org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';",
                CLUSTER_API_KEY, CLUSTER_API_SECRET));
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class.getName());
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", format("%s:%s", SCHEMA_REGISTRY_API_KEY, SCHEMA_REGISTRY_API_SECRET));
        props.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        props.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
        props.put(AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT, false);
        props.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, true);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
        props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");

        var kafkaProducer = new KafkaProducer<String, Employee>(props);
        publishEvents(kafkaProducer, events(), "XXXX");
    }

    private static void publishEvents(
        KafkaProducer<String, Employee> kafkaProducer, List<Employee> events, String topic)
    {
        ObjectMapper mapper = new ObjectMapper();
        events.stream()
            .peek((event) -> {
                try
                {
                    System.out.println("Record: " + mapper.writeValueAsString(event));
                }
                catch (JsonProcessingException e)
                {
                    throw new RuntimeException(e);
                }
            })
            .map(event -> new ProducerRecord<>(topic, UUID.randomUUID().toString(), event))
            .map(kafkaProducer::send)
            .forEach(EventResponseProducer::waitForProducer);
    }

    private static List<Employee> events()
    {
        return List.of(new Employee("shabbir", "panjesha", "123456"));
    }

    @SneakyThrows
    private static void waitForProducer(Future<RecordMetadata> recordMetadataFuture)
    {
        recordMetadataFuture.get();
    }
}

JSON Schema

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Employee",
  "type": "object",
  "additionalProperties": false,
  "javaType": "com.shabbir.model.Employee",
  "$id": "com.shabbir.model.employee.schema.json",
  "properties": {
    "firstName": {
      "type": "string"
    },
    "lastName": {
      "type": "string"
    },
    "gpn": {
      "type": "string"
    }
  },
  "required": [
    "firstName",
    "lastName",
    "gpn"
  ]
}

Java Model class

package com.shabbir.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaString;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import javax.validation.constraints.NotNull;

@Getter
@Setter
@Schema
@NoArgsConstructor
@AllArgsConstructor
@JsonSchemaInject(strings = {
        @JsonSchemaString(path = "$schema", value = "http://json-schema.org/draft-07/schema#"),
        @JsonSchemaString(path = "javaType", value = "com.shabbir.model.Employee"),
        @JsonSchemaString(path = "$id", value = "com.shabbir.model.employee.schema.json"),
        @JsonSchemaString(path = "title", value = "Employee")
})
public class Employee
{
    @JsonProperty(required = true)
    @NotNull(message = "The firstName field is mandatory.")
    private String firstName;
    @JsonProperty(required = true)
    @NotNull(message = "The lastName field is mandatory.")
    private String lastName;
    @JsonProperty(required = true)
    @NotNull(message = "The gpn field is mandatory.")
    private String gpn;
}