SFTP Connector classpath problem

Hi all,

Creating a custom transformer for transforming XML files into AVRO foramt for being transversal among other connectors when I use it in SFTP Connector (version 3.1.10) there are some classpath problems between AvroConverter (using the kafka-connect-avro-converter version 7.3.0) and the connector versions:

[ERROR] 2023-02-22 12:06:46,817 [DistributedHerder-connect-1-1] org.apache.kafka.connect.runtime.Worker startTask - Failed to start task source.sftp.sales.topic-0
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: class io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
	at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:320)
	at org.apache.kafka.connect.runtime.Worker$SourceTaskBuilder.doBuild(Worker.java:1649)
	at org.apache.kafka.connect.runtime.Worker$TaskBuilder.build(Worker.java:1507)
	at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:712)
	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:790)
	at org.apache.kafka.connect.runtime.Worker.startSourceTask(Worker.java:610)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1760)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$doRestartConnectorAndTasks$30(DistributedHerder.java:1455)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.doRestartConnectorAndTasks(DistributedHerder.java:1453)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$processRestartRequests$27(DistributedHerder.java:1396)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.processRestartRequests(DistributedHerder.java:1394)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:451)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:348)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: class io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405)
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436)
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.subjectNameStrategyInstance(AbstractKafkaSchemaSerDeConfig.java:287)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.keySubjectNameStrategy(AbstractKafkaSchemaSerDeConfig.java:265)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.configureClientProperties(AbstractKafkaSchemaSerDe.java:82)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:66)
	at io.confluent.connect.avro.AvroConverter$Serializer.<init>(AvroConverter.java:145)
	at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:77)
	at com.dufry.kafka.connect.transformer.BaseRecordTransformation.configure(BaseRecordTransformation.java:159)
	at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:300)
	... 19 more

Digging a bit into Kafka connect and installed dependency versions I realized that SFTP library versions are outdated (versions 5.5.3 for kafka-avro-serializer and kafka-connect-avro-converter between others it its lib folder).

In the custom transformer there are not any compiled dependency (all the dependencies are injected through classpath for avoiding this kind of problems) so the error is between SFTP connector version and Kafka connect version.

Also having a look about that class (TopicNameStrategy) is an implementation from two interfaces (one deprecated and one not) and probably that is the main issue (the outdated library probably just implements one of them).

There is any workaround or even better an expected dependency upgrading for this connector?

Thanks in advance.

Hello @jorgebg90 and welcome!

Thank you for raising this issue. I agree with you I think that dependency upgrade is the best way to go. I’ve opened a pull request to do this and will follow up with the Connect team to get that reviewed and have a new version of the connector released.

Unfortunately the source code isn’t public so you wouldn’t be able to build it to test it out, and I’m not seeing an easy way to do jar surgery, so for now, hang tight I’ll post an update here once the connector is available.

Thanks!
Dave

Hello Dave,

Thanks for the quick response, then I’ll look forward for that version upgrade to use the custom transformer into the SFTP connector.

Regards,
Jorge

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.