Replicator not passing Headers into Converter#fromConnectData

cross-post from here: link

tl;dr; how can I implement a Kafka Converter which uses headers?
(when using Confluent Replicator)

I have made a custom Kafka Connect Converter, and as I understand it, the toConnectData is used when deserializing messages.

There are 2 functions in the interface, the second one includes Headers and mentions it is the function to be called by the Connect system, while the first exists for backwards compatibility.

The two interfaces:

    byte[] fromConnectData(String topic, Schema schema, Object value);
    byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)

Interface ref:

In reality I am finding the first to be used instead — and for my use-case, I need the headers to perform the function.

Example Converter implementation

package com.example;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;

public class ExampleConverter implements Converter {

    ...

    @Override
    public SchemaAndValue toConnectData(String topic, byte[] value) {
        throw new RuntimeException("headers not supplied, these are required in order to decrypt");
    }

    @Override
    public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
        return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
    }

}

I run this converter using Confluent’s Connect container image confluentinc/cp-enterprise-replicator:7.7.0

I get the following error — which clearly indicates it is calling the older (deprecated?) function, without headers:

java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
    at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
    at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
    at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Seeking advice — am I doing something wrong?

Official response from Confluent — they do not support the Header overload function:

You are right, Converter interface has 2 implementations for both fromConnectData() or toConnectData():

byte[] fromConnectData(String topic, Schema schema, Object value);
byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)
toConnectData(String topic, byte[] value)
toConnectData(String topic, Headers headers, byte[] value)

Replicator calls fromConnectData() and then toConnectData() without using Headers this is why toConnectData(String topic, Headers headers, byte[] value) is not called. If you want to perform custom conversion on headers, you can use set header.converter=com.example.ExampleConverter instead.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.