SFTP Sink Connector with Parquet format

Hello there,

I try to use SFTP Sink connector write data from Kafka topic to file type like AVRO, JSON and parquet.

My configure changes only format.class as following:

  • io.confluent.connect.sftp.sink.format.avro.AvroFormat - Success
  • io.confluent.connect.sftp.sink.format.json.JsonFormat - Success
  • io.confluent.connect.sftp.sink.format.parquet.ParquetFormat - Failed
  • io.confluent.connect.sftp.sink.format.csv.CsvFormat - Success
  • io.confluent.connect.sftp.sink.format.tsv.TsvFormat - Success

When using Parquet got this error

[2023-08-23 09:14:13,316] ERROR Error while closing the writer: stream already closed (io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter)
[2023-08-23 09:14:13,316] ERROR Exception on topic partition Topic-0:  (io.confluent.connect.sftp.sink.SftpSinkTask)
org.apache.kafka.connect.errors.RetriableException: Record commit failed
        at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:108)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:484)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:465)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:176)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
        at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:153)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: stream already closed
        at com.jcraft.jsch.ChannelSftp$1.write(ChannelSftp.java:800)
        at io.confluent.connect.sftp.sink.storage.SftpOutputStream.write(SftpOutputStream.java:114)
        at io.confluent.connect.sftp.sink.format.parquet.ParquetOutputFile$1$2.write(ParquetOutputFile.java:82)
        at java.base/java.io.OutputStream.write(OutputStream.java:122)
        at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
        at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:620)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:241)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:319)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
        at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
        at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:105)
        ... 16 more

I tried to increase flush.size and got new error

[2023-08-23 16:30:25,723] ERROR Error while closing the writer: can not write PageHeader(type:DICTIONARY_PAGE, uncompressed_page_size:20, compressed_page_size:22, crc:-1202185600, dictionary_page_header:DictionaryPageHeader(num_values:5, encoding:PLAIN_DICTIONARY)) (io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter)
[2023-08-23 16:30:25,723] ERROR Exception on topic partition BAY_DBZSRC_Users-0:  (io.confluent.connect.sftp.sink.SftpSinkTask)
org.apache.kafka.connect.errors.RetriableException: Record commit failed
        at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:108)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:484)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:465)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:176)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
        at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:153)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: can not write PageHeader(type:DICTIONARY_PAGE, uncompressed_page_size:20, compressed_page_size:22, crc:-1202185600, dictionary_page_header:DictionaryPageHeader(num_values:5, encoding:PLAIN_DICTIONARY))
        at org.apache.parquet.format.Util.write(Util.java:233)
        at org.apache.parquet.format.Util.writePageHeader(Util.java:74)
        at org.apache.parquet.format.converter.ParquetMetadataConverter.writeDictionaryPageHeader(ParquetMetadataConverter.java:1532)
        at org.apache.parquet.hadoop.ParquetFileWriter.writeDictionaryPage(ParquetFileWriter.java:403)
        at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:612)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:241)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:319)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
        at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
        at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:105)
        ... 16 more
Caused by: shaded.parquet.org.apache.thrift.transport.TTransportException: java.io.IOException: stream already closed

Here is my configure

curl -X POST \
    http://localhost:8013/connectors \
    -H 'Content-Type: application/json' \
    -d '{
  "name": "SFTP_Sink",
  "config": {
    "connector.class": "io.confluent.connect.sftp.SftpSinkConnector",
    "tasks.max": "1",

    "topics": "Users",
    "confluent.topic.bootstrap.servers": "broker:9092",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url":"http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081",

    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "120000",
    "path.format": "'\''year_month'\''=YYYY_MM/'\''day_hour_min'\''=dd_hh_mm",
    "locale": "en-US",
    "timezone": "Asia/Bangkok",
    
    "flush.size": "15",
    "rotate.interval.ms": "10000",

    "format.class": "io.confluent.connect.sftp.sink.format.parquet.ParquetFormat",
    "storage.class": "io.confluent.connect.sftp.sink.storage.SftpSinkStorage",

    "sftp.host": "vm1",
    "sftp.port": "22",
    "sftp.username": "sftpuser",
    "sftp.password": "psswd",
    "sftp.working.dir": "/home/sftpuser/parquet"
  }
}' | jq .

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.