De-serialization exception in kafka streams

  1. I have 2 kafka clusters, one at site1 and other at site2.
  2. kafka mirroring is enabled on the standby site 2
  3. I have kafka streams application with state stores configured running on site1.

when I perform switchover and start the same application on site2 I often get deserialization exception like below.

[2023-12-04 18:37:55,761] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 22
Caused by: java.io.IOException: Invalid int encoding
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:145)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)

I even tried deleting the cleaning state store and recreating it from the topic again but, that does not help. looks like the topic data is corrupted
I am running confluent-5.3.1

can anyone help me here what could be the problem?

The problem is with the schema id. Sometimes the same schema gets different id assigned by the schema registry

The below is the logic for serializing the message to kafka topic.

try {

        if (autoRegisterSchema) {
            id = schemaRegistry.register(subjectName, schema);
        } else {
            id = schemaRegistry.getId(subjectName, schema);
        }

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        out.write(MAGIC_BYTE);
        out.write(ByteBuffer.allocate(idSize).putInt(id).array());

        if (data instanceof byte[]) {
            out.write((byte[]) data);
        } else {
            AvroSchema avroSchema = new AvroSchema(schema);
            byte[] avroData;
            avroData = mapper.writer(avroSchema).writeValueAsBytes(data);
            out.write(avroData);
        }

        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException | RestClientException e) {
        throw new SerializationException("Error serializing avro message", e);
    }