Sftp sink connector returns Failed to open new SFTP channel with existing session
Error:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to open new SFTP channel with existing session.
at io.confluent.connect.sftp.connection.SftpConnection.newChannelFromSession(SftpConnection.java:108)
at io.confluent.connect.sftp.sink.storage.SftpOutputStream.createPath(SftpOutputStream.java:48)
at io.confluent.connect.sftp.sink.storage.SftpOutputStream.<init>(SftpOutputStream.java:44)
at io.confluent.connect.sftp.sink.storage.SftpSinkStorage.create(SftpSinkStorage.java:87)
at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriter.<init>(CsvRecordWriter.java:32)
at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:29)
at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:12)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:408)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:454)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:250)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.writePartitionWhenPaused(TopicPartitionWriter.java:208)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:177)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:154)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
... 10 more
Caused by: com.jcraft.jsch.JSchException: java.io.IOException: inputstream is closed
at com.jcraft.jsch.ChannelSftp.start(ChannelSftp.java:315)
at com.jcraft.jsch.Channel.connect(Channel.java:152)
at com.jcraft.jsch.Channel.connect(Channel.java:145)
at io.confluent.connect.sftp.connection.SftpConnection.newChannelFromSession(SftpConnection.java:104)
... 24 more
Caused by: java.io.IOException: inputstream is closed
at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2911)
at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
at com.jcraft.jsch.ChannelSftp.start(ChannelSftp.java:262)
... 27 more
Current Config:
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: update-sftp-sink-connector
namespace: confluent
spec:
name: update-sftp-sink-connector
taskMax: 1
class: io.confluent.connect.sftp.SftpSinkConnector
configs:
topics: updates-topic
file.delim: "."
sftp.host: "my.host.name"
sftp.port: "22"
sftp.username: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:username}
sftp.password: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:password}
sftp.working.dir: "/dev/customer-updates"
directory.delim: "/"
rotate.interval.ms: "120000"
flush.size: 1
partition.duration.ms: "120000"
partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
format.class: io.confluent.connect.sftp.sink.format.csv.CsvFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
storage.class: io.confluent.connect.sftp.sink.storage.SftpSinkStorage
locale: en-GB
timezone: UTC
timestamp.extractor: Record
restartPolicy:
type: OnFailure
maxRetry: 10
connectClusterRef:
name: connect
namespace: confluent
Any ideas on what the issue could be and how to resolve it would be greatly appreciated