cross-post from here: link
tl;dr; how can I implement a Kafka Converter which uses headers?
(when using Confluent Replicator)
I have made a custom Kafka Connect Converter, and as I understand it, the toConnectData
is used when deserializing messages.
There are 2 functions in the interface, the second one includes Headers and mentions it is the function to be called by the Connect system, while the first exists for backwards compatibility.
The two interfaces:
byte[] fromConnectData(String topic, Schema schema, Object value);
byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)
Interface ref:
In reality I am finding the first to be used instead — and for my use-case, I need the headers to perform the function.
Example Converter implementation
package com.example;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
public class ExampleConverter implements Converter {
...
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new RuntimeException("headers not supplied, these are required in order to decrypt");
}
@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
}
}
I run this converter using Confluent’s Connect container image confluentinc/cp-enterprise-replicator:7.7.0
I get the following error — which clearly indicates it is calling the older (deprecated?) function, without headers:
java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Seeking advice — am I doing something wrong?