Deserializing a byte[] array from a Kafka ByteArrayDeserializer into a GenericRecord or a SpecificRecord with schema

I currently have an event stream with a Kafka topic that sends a schema-registry manager event through Java Spring Kafka. On the producer side, I’m sending with no problems:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

I can receive these messages without any problem by specifying the Kafka receiver as follows:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

I already have access to the generated class Java class that is pulled from the schema registry and using this config I can deserialize no problem.

Where I am facing an issue is that I have a consumer that has to receive the message using:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

No matter what I try implementation wise, everything I face throws the following exceptions depending on implementation:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -55

This is the only prior I can find. I have tried deserialization with and without the schema and the same errors occur. I do have access to the schema at deserialization time, the same generated Java used to send the message on the producer side.

I have attached two example impls to the bottom of this post to show what has not worked:

The following has been tried with and without the specific schema, as well as deserializing to a single object vs an array of objects or to a GenericRecord:

LOG.info("Recieved Event: {}", event);
    LOG.debug("data='{}'", DatatypeConverter.printHexBinary(event));
    ByteArrayInputStream in = new ByteArrayInputStream(event);
    DatumReader<SpecificSchemaObject> userDatumReader = new SpecificDatumReader<>(SpecificSchemaObject.getClassSchema());
    BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
    List<SpecificSchemaObject> records = new ArrayList<SpecificSchemaObject>();
    SpecificSchemaObject result = null;
    try {
      while (true) {
        try {
          SpecificSchemaObject record = userDatumReader.read(null, decoder);
          records.add(record);
        } catch (EOFException eof) {
          break;
        }
      }
      result = (SpecificSchemaObject) userDatumReader.read(null, decoder);
      LOG.info("deserialized data='{}'", records);
      result = (SpecificSchemaObject) records;
    } catch (IOException ioe) {

    }
Schema schema = WebsiteAdminEvent.getClassSchema();
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
    Decoder decoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(event), null);
    GenericRecord gr = null;
    try {
      gr = datumReader.read(null, decoder);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    LOG.info("got record: ", gr);
    List<WebsiteAdminEvent> listOfRecords = new ArrayList<>();
    DatumReader<WebsiteAdminEvent> reader = new GenericDatumReader<>();
    DataFileReader<WebsiteAdminEvent> fileReader =
        null;
    try {
      fileReader = new DataFileReader<>(new SeekableByteArrayInput(event), reader);

      WebsiteAdminEvent record = null;

      while (fileReader.hasNext()) {
        listOfRecords.add(fileReader.next(record));
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    ```

Seems this is a clients or schema registry question, but not a Kafka Streams one.

What I don’t understand: why do you want to use the byte-array deserialized to begin with and deserialize the data manually? If using io.confluent.kafka.serializers.KafkaAvroDeserializer.class works, why don’t you just use it?

It seems, in your manual deserialization step, you don’t use Confluent deserializer but a plain Avro deserializer? For this case, you need to know that Confluent Avro serializer adds a 5 bytes header that encodes the schema id for schema registry. So maybe you need to cut off the first 5 bytes to make it work.