2 questions regarding Implementation of org.springframework.kafka.support.serializer.JsonTypeResolver

Im trying to implement this resolve method for Kafka’s JsonTypeResolver. However I have run into 2 issues.

How do I register my class so that kafka will use it when deserializing?

How do I use JavaType class to specify the class I want this deserialized into?

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.header.Headers;
import org.springframework.kafka.support.serializer.JsonTypeResolver;
    
import com.fasterxml.jackson.databind.JavaType;
    
public class OgTestConsumerResolver implements JsonTypeResolver {
        @Override
        public JavaType resolveType(String topic, byte[] data, Headers headers) {
            if(StringUtils.isNotEmpty(topic) && topic.equals("og.test.event.with.domain.object")) {
                return new JavaTyp
            }
            return null;
        }
    }

We were able to get the functionality we wanted by:

setting the TYPE_ID_MAPPINGS mappings on the consumer properties

	// format is similar to JsonDeserializer.TYPE_MAPPINGS - comma separated list of key:value
	// for each pair, key is type id used by message producer, value is type id used by message consumer
	props.put(KafkaCustomJsonDeserializer.TYPE_ID_MAPPINGS, kp.getConsumerTypeMappings());

this is a string,string map with class from and class to

then extending the kafka deserializer like so

public class KafkaCustomJsonDeserializer<T> extends JsonDeserializer<T> {
	public static final String TYPE_ID_MAPPINGS = "com.test.kafka.typeid.mappings";
	private static final String TYPE_ID_HEADER = "__TypeId__";
	private Map<String, String> typeIdMap = new HashMap<>();

	@Override
	public synchronized void configure(Map<String, ?> configs, boolean isKey) {
		if (!isKey) {
			typeIdMap = (Map<String, String>) configs.get(TYPE_ID_MAPPINGS);
		}

		super.configure(configs, isKey);
	}

	@Override
	public T deserialize(String topic, Headers headers, byte[] data) {
		Headers customHeaders = null;
		// replace type id header if replacement is configured
		if (!typeIdMap.isEmpty()) {
			if (headers != null) {
				Iterable<Header> iterable = headers.headers(TYPE_ID_HEADER);
				if (iterable != null) {
					byte[] value = null;
					for (Header header : iterable) {
						value = header.value();
						// should be only one type id header
						break;
					}

					if (value != null) {
						try {
							String typeId = new String(value, "UTF-8");
							String customTypeId = typeIdMap.get(typeId);
							if (customTypeId != null) {
								customHeaders = new RecordHeaders(headers.toArray());
								customHeaders.remove(TYPE_ID_HEADER);
								customHeaders.add(TYPE_ID_HEADER, customTypeId.getBytes());
							}
						} catch (UnsupportedEncodingException e) {
							// TODO report error!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
							e.printStackTrace();
						}
					}
				}
			}
		}

		return super.deserialize(topic, customHeaders != null ? customHeaders : headers, data);
	}
}

this enabled us to configure the kafka listener to deserialize incoming classes into any other classes we want.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.