I haven’t seen the recommended value.converter.value.subject.name.strategy work, but don’t understand why value.subject.name.strategy should work as it is a config override for the converter
Correction value.subject.name.strategy does nothing, that ends up using the default strategy instead with is topic-value for value and topic-key for key
Hi @bhaveshraheja , can you maybe rephrase your question with what you’re trying to do and what’s not actually working for you (and include full details, config that you’re using, log files of the converter, etc)?
The answer on SO is unambiguous and upvoted plenty of times, indicating it worked for lots of people
Correct, nothing that I’ve seen, including in those linked articles, suggests that value.subject.name.strategy alone will do anything.
java.lang.RuntimeException: MyStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.Subject
NameStrategy
at io.confluent.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:207)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.subjectNameStrategyInstance(AbstractKafkaAvroSerDeConfig.java:199)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.valueSubjectNameStrategy(AbstractKafkaAvroSerDeConfig.java:181)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:68)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:44)
at io.confluent.connect.avro.AvroConverter$Serializer.<init>(AvroConverter.java:127)
at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:72)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:293)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:444)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:126)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1201)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1197)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-02-22 14:22:25,020] ERROR [pool-39-thread-3] [Worker clientId=connect-1, groupId=branch-connect-cluster] Couldn't instantiate task warehouse-sink-test-topic-0 because it h
as an invalid task configuration. This task will not execute until reconfigured. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.NullPointerException
at org.apache.kafka.connect.runtime.Worker$ConnectorStatusMetricsGroup.recordTaskRemoved(Worker.java:917)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:476)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:126)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1201)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1197)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Where is your MyStrategy class deployed? In the connector’s plugin.path, or worker classpath? I think the S3 sink connector bundles a jar containing io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy which might be a different version than what the Connect worker has, perhaps causing an issue?
It’s currently deployed under plugin.path at the moment, but what you are mentioning may be at play here.
I think around 5.2/5.3 (can’t find the change exactly), the connectors stopped shipping with the Confluent libs for connect, so have to move towards confluent-hub based installation.