Avro Converter with custom Topic name strategy and sink connects

There’s a bunch of confusion around this at least on Stack Overflow.

Use Avro Sink Converter with custom naming strategy

Documentation says: Using Kafka Connect with Schema Registry | Confluent Documentation

Note that when added to the worker or connector configuration, these properties require the key.converter. and value.converter.

which means that to override the subject name strategy, one must use

value.converter.value.subject.name.strategy.

Now, when we check SO, we see the following references:

  1. avro - Kafka Connect not working with Subject Strategies - Stack Overflow where it says the same thing

  2. Comments in Kafka connector and Schema Registry - Error Retrieving Avro Schema - Subject not found - Stack Overflow saying that use value.subject.name.strategy vs the original value.converter.value.subject.name.strategy

I also understand that in 4.1.3, there was also a bug that caused this to not work fully, see CC-1902 : pass all configs to underlying Serializer/deserializer by mageshn · Pull Request #801 · confluentinc/schema-registry · GitHub

I haven’t seen the recommended value.converter.value.subject.name.strategy work, but don’t understand why value.subject.name.strategy should work as it is a config override for the converter

Correction value.subject.name.strategy does nothing, that ends up using the default strategy instead with is topic-value for value and topic-key for key

Hi @bhaveshraheja , can you maybe rephrase your question with what you’re trying to do and what’s not actually working for you (and include full details, config that you’re using, log files of the converter, etc)?

The answer on SO is unambiguous and upvoted plenty of times, indicating it worked for lots of people :slight_smile:

Correct, nothing that I’ve seen, including in those linked articles, suggests that value.subject.name.strategy alone will do anything.

The answer on SO is unambiguous and upvoted plenty of times, indicating it worked for lots of people :slight_smile:

Thanks @rmoff. Figured that would be the best thing to reference & used that.

I dug a little deeper and looks like the issue is because of implementing a custom strategy & giving that to the connect worker to find it.

The error I am receiving is closer to

is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy

This kind of is similar to issues mentioned in this mailing list as well.

As per cannot use TopicRecordNameStrategy · Issue #825 · confluentinc/schema-registry · GitHub it was a class-loader problem, but I am not able to nail this exactly where its bubbling up. Any help in solving this would be appreciated.

My current version is:

kafka-topics --version
5.4.3-ccs (Commit:0576c4b7e407675e)

I double-checked the class & path, and given I am not getting a ClassNotFoundException, there seems to be something else at play.

In code, I double-checked,I have

import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;

with

public class MyStrategy implements SubjectNameStrategy

In Kafka-connect, my configuration being passed is

value.converter.value.subject.name.strategy": “MyStrategy”

What’s the full connector config that you’re using? And from your Kafka Connect worker log what’s the full error details?

Config

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "s3.region": "ap-southeast-1",
  "partition.duration.ms": "1800000",
  "topics.dir": "<path>",
  "schema.compatibility": "NONE",
  "flush.size": "1000000",
  "timezone": "UTC",
  "tasks.max": "2",
  "s3.part.size": "5242880",
  "value.converter.value.subject.name.strategy": "MyStrategy",
  "topics.regex": "test-topic",
  "locale": "en",
  "value.converter.schema.registry.url": "http://localhost:8081",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "name": "warehouse-sink-test-topic",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "path.format": "'cd'=YYYY-MM-dd/'hr'=HH",
  "timestamp.extractor": "Wallclock",
  "s3.bucket.name": "<CustomConfig>",
  "rotate.schedule.interval.ms": "180000"
}

Full trace

java.lang.RuntimeException: MyStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.Subject
NameStrategy
        at io.confluent.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:207)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.subjectNameStrategyInstance(AbstractKafkaAvroSerDeConfig.java:199)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.valueSubjectNameStrategy(AbstractKafkaAvroSerDeConfig.java:181)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:68)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:44)
        at io.confluent.connect.avro.AvroConverter$Serializer.<init>(AvroConverter.java:127)
        at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:72)
        at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:293)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:444)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:126)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1201)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1197)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2021-02-22 14:22:25,020] ERROR [pool-39-thread-3] [Worker clientId=connect-1, groupId=branch-connect-cluster] Couldn't instantiate task warehouse-sink-test-topic-0 because it h
as an invalid task configuration. This task will not execute until reconfigured. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.NullPointerException
        at org.apache.kafka.connect.runtime.Worker$ConnectorStatusMetricsGroup.recordTaskRemoved(Worker.java:917)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:476)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:126)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1201)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1197)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Where is your MyStrategy class deployed? In the connector’s plugin.path, or worker classpath? I think the S3 sink connector bundles a jar containing io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy which might be a different version than what the Connect worker has, perhaps causing an issue?

1 Like

Interesting. Didn’t consider that @mikebin

It’s currently deployed under plugin.path at the moment, but what you are mentioning may be at play here.
I think around 5.2/5.3 (can’t find the change exactly), the connectors stopped shipping with the Confluent libs for connect, so have to move towards confluent-hub based installation.

I tried installing via the hub

confluent-hub install confluentinc/kafka-connect-s3:latest

and it seems to throw a different error now. Will spend some time working on that now. If I do find any other updates/insights, will post back here.

Thanks for all the nudges & help.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.