ConsumerInterceptor

Hello everyone, I’m having a particularly ‘nigglesome’ problem with interceptors I desperately need help with.
I’m using ProducerInterceptor and ConsumerInterceptor within a spring cloud streams application and have configured them via a StreamsBuilderFactoryConfig bean, they are firing as expected and I can retrieve some attributes of the Producer and Consumer Records, there is however a problem retrieving the actual ‘key’ and ‘value’ of these records. I’ve tried to assign/cast it to the expected value but I get a class cast exception. My initial instincts suggest this might be a serialisation/deserialisation issue. There isn’t however any indication of this elsewhere as the application works fine. The consumer binder has a default key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde

Whilst. The producer binder has a key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Example output I get printing out the key/value from the Consumer and producer record is :

ProducerRecord(topic=.supplier.batch.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=[B@1089613d, value=[B@4a4e10fd, timestamp=1623572275792

Received Message timestamp =1623571603723, partition =3, offset = 92, key = [B@49e41727, value = [B

The following exception is thrown when trying to assign the key to a String instance :
Error executing interceptor onConsume callback
java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap’)

I’ve copied the ‘onConsume’ method of the consumer interceptor to provide a little more detail

@Override
public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
    log.info("---<< ProducerRecord being received {} >>---", records);
    for (ConsumerRecord<String, Object> record : records) {
        if (record.topic().equals(LOCATION_SUPPLIER_SOURCE)) {
            log.info("---<< setting timestamp for  >>---");
            String key = record.key();
            log.info("Received Message: timestamp ={}, partition ={}, offset ={}, key = {}, value = {}\n",
                    record.timestamp(), record.partition(), record.offset(), record.key(), record.value().getClass().getName());
        }
    }
    return records;
}

What you describe is a known issue. It’s a result how Kafka Streams works internally:

Because Kafka Streams may read and/or write from/to multiple input/output topic, but uses a single Consumer/Producer for all those topics, Kafka Streams needs to deal with different data types at the same time. For example, a consumer may read two topics, one with types <String,Long> and the other with types <Int,Json>. However, a consumer can only take one deserializer for the key and value, respectively. Thus, the consumer cannot be configured with key StringDeserializer or IntegerSerializer because it would fail for one of both topics. (Similar for the value case.)

To allow for using a shared consumer, Kafka Streams configures the consumer deserializer as ByteArrayDeserializer to avoid typing issue, and does the deserialization by itself (ie, outside of the consumer, by inspecting the record metadata, ie, topic name and by applying the corresponding deserializer). Thus, interceptors only get raw bytes, because the consumer does not deserializer the records, but just passed the bytes to Kafka Streams.

Similarly for the write path, Kafka Streams first serializes the record and only passed <byte[],byte[]> to the producer that is configured with ByteArraySerializer.

Thus, if you need to access the key and value in an interceptor, you need to use the corresponding (de)serializer manually, because the provided type to the interceptor is always <byte[], byte[]> for this case.

Hi there,
thanks for that, I did arrive at this conclusion after debugging the app and getting the key/value serialiser from the configure method, I however haven’t been able to deserialise the value. The key deserialises fine. Any other pointers you can share?

Kind regards

1 Like

haven’t been able to deserialise the value. The key deserialises fine

Why does it not work the same way for key and value? What is the issue?

I have since resolved the issue. Thanks for reaffirming my earlier thinking, I had abandoned the byte trail of thought when I ran into ‘invalid stream header’ errors.