I am using java based producer to publish events to Confluent, but I am consistently encountering the following exception is rejected by the record interceptor io.confluent.cloud.kafka.schemaregistry.validator.CloudRecordSchemaValidator
There are no helpful pointers in the log, making it difficult to understand what’s going wrong, hence posting here to get experts advise.
TopicStrategy: TopicRecordNameStrategy
Schema: JSON Schema validation enabled
Java code: (For security reasons, I have obfuscated the keys/secrets in the below code)
package com.shabbir.kafka.schemaregistry.clients.jsonschema.conflluent;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import static java.lang.String.format;
import com.shabbir.model.Employee;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig;
import lombok.SneakyThrows;
public class EventResponseProducer
{
private static final String CONFLUENT_SERVER_URL = "XXXX";
private static final String CLUSTER_API_KEY = "XXXX";
private static final String CLUSTER_API_SECRET = "XXXX";
private static final String SCHEMA_REGISTRY_URL = "XXXX";
private static final String SCHEMA_REGISTRY_API_KEY = "XXXX";
private static final String SCHEMA_REGISTRY_API_SECRET = "XXXX";
@SneakyThrows
public static void main(String[] args)
{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CONFLUENT_SERVER_URL);
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
format("org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';",
CLUSTER_API_KEY, CLUSTER_API_SECRET));
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class.getName());
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", format("%s:%s", SCHEMA_REGISTRY_API_KEY, SCHEMA_REGISTRY_API_SECRET));
props.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
props.put(AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT, false);
props.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, true);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
var kafkaProducer = new KafkaProducer<String, Employee>(props);
publishEvents(kafkaProducer, events(), "XXXX");
}
private static void publishEvents(
KafkaProducer<String, Employee> kafkaProducer, List<Employee> events, String topic)
{
ObjectMapper mapper = new ObjectMapper();
events.stream()
.peek((event) -> {
try
{
System.out.println("Record: " + mapper.writeValueAsString(event));
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
})
.map(event -> new ProducerRecord<>(topic, UUID.randomUUID().toString(), event))
.map(kafkaProducer::send)
.forEach(EventResponseProducer::waitForProducer);
}
private static List<Employee> events()
{
return List.of(new Employee("shabbir", "panjesha", "123456"));
}
@SneakyThrows
private static void waitForProducer(Future<RecordMetadata> recordMetadataFuture)
{
recordMetadataFuture.get();
}
}
JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Employee",
"type": "object",
"additionalProperties": false,
"javaType": "com.shabbir.model.Employee",
"$id": "com.shabbir.model.employee.schema.json",
"properties": {
"firstName": {
"type": "string"
},
"lastName": {
"type": "string"
},
"gpn": {
"type": "string"
}
},
"required": [
"firstName",
"lastName",
"gpn"
]
}
Java Model class
package com.shabbir.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaString;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import javax.validation.constraints.NotNull;
@Getter
@Setter
@Schema
@NoArgsConstructor
@AllArgsConstructor
@JsonSchemaInject(strings = {
@JsonSchemaString(path = "$schema", value = "http://json-schema.org/draft-07/schema#"),
@JsonSchemaString(path = "javaType", value = "com.shabbir.model.Employee"),
@JsonSchemaString(path = "$id", value = "com.shabbir.model.employee.schema.json"),
@JsonSchemaString(path = "title", value = "Employee")
})
public class Employee
{
@JsonProperty(required = true)
@NotNull(message = "The firstName field is mandatory.")
private String firstName;
@JsonProperty(required = true)
@NotNull(message = "The lastName field is mandatory.")
private String lastName;
@JsonProperty(required = true)
@NotNull(message = "The gpn field is mandatory.")
private String gpn;
}