Hello everyone, I’m having a particularly ‘nigglesome’ problem with interceptors I desperately need help with.
I’m using ProducerInterceptor and ConsumerInterceptor within a spring cloud streams application and have configured them via a StreamsBuilderFactoryConfig bean, they are firing as expected and I can retrieve some attributes of the Producer and Consumer Records, there is however a problem retrieving the actual ‘key’ and ‘value’ of these records. I’ve tried to assign/cast it to the expected value but I get a class cast exception. My initial instincts suggest this might be a serialisation/deserialisation issue. There isn’t however any indication of this elsewhere as the application works fine. The consumer binder has a default key/value serdes set as :
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
Whilst. The producer binder has a key/value serdes set as :
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Example output I get printing out the key/value from the Consumer and producer record is :
ProducerRecord(topic=.supplier.batch.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=[B@1089613d, value=[B@4a4e10fd, timestamp=1623572275792
Received Message timestamp =1623571603723, partition =3, offset = 92, key = [B@49e41727, value = [B
The following exception is thrown when trying to assign the key to a String instance :
Error executing interceptor onConsume callback
java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap’)
I’ve copied the ‘onConsume’ method of the consumer interceptor to provide a little more detail
@Override
public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
log.info("---<< ProducerRecord being received {} >>---", records);
for (ConsumerRecord<String, Object> record : records) {
if (record.topic().equals(LOCATION_SUPPLIER_SOURCE)) {
log.info("---<< setting timestamp for >>---");
String key = record.key();
log.info("Received Message: timestamp ={}, partition ={}, offset ={}, key = {}, value = {}\n",
record.timestamp(), record.partition(), record.offset(), record.key(), record.value().getClass().getName());
}
}
return records;
}