Hello,
I ran into the issue of serdes where Kafka stream is not able to construct kafka messages back to its Model class.
It throws below error
Exception in thread "radius-activity-ed509817-da62-4817-bb77-f7c265609d2b-StreamThread-1" java.lang.ClassCastException: class com.mii.kafka.communicator.KafkaHDCommunication cannot be cast to class java.lang.String (com.mii.kafka.communicator.KafkaHDCommunication is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
Kafka Producer Configuration::
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaHDCommunicationSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaHDCommunicationSerializer.class);
Kafka Stream configuration::
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"radius-activity");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,CustomSerde.class.getName());
I have created CustomSerde based on some forums.
public class CustomSerde extends WrapperSerde<KafkaHDCommunication> {
public CustomSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(KafkaHDCommunication.class));
}
}
Questionnaires::
- How to use Custom class KafkaHDCommunication as SERDES ?
- Can I reuse KafkaHDCommunicationSerializer and KafkaHDCommunicationDeserializer that I have already created for Producer ?
- Is there any link that explains well about producer and kafka streams example with custom serdes ?