I currently have an event stream with a Kafka topic that sends a schema-registry manager event through Java Spring Kafka. On the producer side, I’m sending with no problems:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
I can receive these messages without any problem by specifying the Kafka receiver as follows:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
I already have access to the generated class Java class that is pulled from the schema registry and using this config I can deserialize no problem.
Where I am facing an issue is that I have a consumer that has to receive the message using:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
No matter what I try implementation wise, everything I face throws the following exceptions depending on implementation:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -55
This is the only prior I can find. I have tried deserialization with and without the schema and the same errors occur. I do have access to the schema at deserialization time, the same generated Java used to send the message on the producer side.
I have attached two example impls to the bottom of this post to show what has not worked:
The following has been tried with and without the specific schema, as well as deserializing to a single object vs an array of objects or to a GenericRecord:
LOG.info("Recieved Event: {}", event);
LOG.debug("data='{}'", DatatypeConverter.printHexBinary(event));
ByteArrayInputStream in = new ByteArrayInputStream(event);
DatumReader<SpecificSchemaObject> userDatumReader = new SpecificDatumReader<>(SpecificSchemaObject.getClassSchema());
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
List<SpecificSchemaObject> records = new ArrayList<SpecificSchemaObject>();
SpecificSchemaObject result = null;
try {
while (true) {
try {
SpecificSchemaObject record = userDatumReader.read(null, decoder);
records.add(record);
} catch (EOFException eof) {
break;
}
}
result = (SpecificSchemaObject) userDatumReader.read(null, decoder);
LOG.info("deserialized data='{}'", records);
result = (SpecificSchemaObject) records;
} catch (IOException ioe) {
}
Schema schema = WebsiteAdminEvent.getClassSchema();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(event), null);
GenericRecord gr = null;
try {
gr = datumReader.read(null, decoder);
} catch (IOException e) {
throw new RuntimeException(e);
}
LOG.info("got record: ", gr);
List<WebsiteAdminEvent> listOfRecords = new ArrayList<>();
DatumReader<WebsiteAdminEvent> reader = new GenericDatumReader<>();
DataFileReader<WebsiteAdminEvent> fileReader =
null;
try {
fileReader = new DataFileReader<>(new SeekableByteArrayInput(event), reader);
WebsiteAdminEvent record = null;
while (fileReader.hasNext()) {
listOfRecords.add(fileReader.next(record));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
```