We were able to get the functionality we wanted by:
setting the TYPE_ID_MAPPINGS mappings on the consumer properties
// format is similar to JsonDeserializer.TYPE_MAPPINGS - comma separated list of key:value
// for each pair, key is type id used by message producer, value is type id used by message consumer
props.put(KafkaCustomJsonDeserializer.TYPE_ID_MAPPINGS, kp.getConsumerTypeMappings());
this is a string,string map with class from and class to
then extending the kafka deserializer like so
public class KafkaCustomJsonDeserializer<T> extends JsonDeserializer<T> {
public static final String TYPE_ID_MAPPINGS = "com.test.kafka.typeid.mappings";
private static final String TYPE_ID_HEADER = "__TypeId__";
private Map<String, String> typeIdMap = new HashMap<>();
@Override
public synchronized void configure(Map<String, ?> configs, boolean isKey) {
if (!isKey) {
typeIdMap = (Map<String, String>) configs.get(TYPE_ID_MAPPINGS);
}
super.configure(configs, isKey);
}
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
Headers customHeaders = null;
// replace type id header if replacement is configured
if (!typeIdMap.isEmpty()) {
if (headers != null) {
Iterable<Header> iterable = headers.headers(TYPE_ID_HEADER);
if (iterable != null) {
byte[] value = null;
for (Header header : iterable) {
value = header.value();
// should be only one type id header
break;
}
if (value != null) {
try {
String typeId = new String(value, "UTF-8");
String customTypeId = typeIdMap.get(typeId);
if (customTypeId != null) {
customHeaders = new RecordHeaders(headers.toArray());
customHeaders.remove(TYPE_ID_HEADER);
customHeaders.add(TYPE_ID_HEADER, customTypeId.getBytes());
}
} catch (UnsupportedEncodingException e) {
// TODO report error!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
e.printStackTrace();
}
}
}
}
}
return super.deserialize(topic, customHeaders != null ? customHeaders : headers, data);
}
}
this enabled us to configure the kafka listener to deserialize incoming classes into any other classes we want.