SFTP Sink Connector - Failed to open new SFTP channel with existing session

Sftp sink connector returns Failed to open new SFTP channel with existing session

Error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to open new SFTP channel with existing session.
	at io.confluent.connect.sftp.connection.SftpConnection.newChannelFromSession(SftpConnection.java:108)
	at io.confluent.connect.sftp.sink.storage.SftpOutputStream.createPath(SftpOutputStream.java:48)
	at io.confluent.connect.sftp.sink.storage.SftpOutputStream.<init>(SftpOutputStream.java:44)
	at io.confluent.connect.sftp.sink.storage.SftpSinkStorage.create(SftpSinkStorage.java:87)
	at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriter.<init>(CsvRecordWriter.java:32)
	at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:29)
	at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:12)
	at io.confluent.connect.sftp.sink.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:408)
	at io.confluent.connect.sftp.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:454)
	at io.confluent.connect.sftp.sink.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:250)
	at io.confluent.connect.sftp.sink.TopicPartitionWriter.writePartitionWhenPaused(TopicPartitionWriter.java:208)
	at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:177)
	at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
	at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:154)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
	... 10 more
Caused by: com.jcraft.jsch.JSchException: java.io.IOException: inputstream is closed
	at com.jcraft.jsch.ChannelSftp.start(ChannelSftp.java:315)
	at com.jcraft.jsch.Channel.connect(Channel.java:152)
	at com.jcraft.jsch.Channel.connect(Channel.java:145)
	at io.confluent.connect.sftp.connection.SftpConnection.newChannelFromSession(SftpConnection.java:104)
	... 24 more
Caused by: java.io.IOException: inputstream is closed
	at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2911)
	at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
	at com.jcraft.jsch.ChannelSftp.start(ChannelSftp.java:262)
	... 27 more

Current Config:

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: update-sftp-sink-connector
  namespace: confluent
spec:
  name: update-sftp-sink-connector
  taskMax: 1
  class: io.confluent.connect.sftp.SftpSinkConnector
  configs:
    topics: updates-topic
    file.delim: "."
    sftp.host: "my.host.name"
    sftp.port: "22"
    sftp.username: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:username}
    sftp.password: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:password}
    sftp.working.dir: "/dev/customer-updates"
    directory.delim: "/"
    rotate.interval.ms: "120000"
    flush.size: 1
    partition.duration.ms: "120000"
    partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
    format.class: io.confluent.connect.sftp.sink.format.csv.CsvFormat
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    storage.class: io.confluent.connect.sftp.sink.storage.SftpSinkStorage
    locale: en-GB
    timezone: UTC
    timestamp.extractor: Record
  restartPolicy:
    type: OnFailure
    maxRetry: 10
  connectClusterRef:
    name: connect
    namespace: confluent

Any ideas on what the issue could be and how to resolve it would be greatly appreciated

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.