Hello there,
I try to use SFTP Sink connector write data from Kafka topic to file type like AVRO, JSON and parquet.
My configure changes only format.class as following:
- io.confluent.connect.sftp.sink.format.avro.AvroFormat - Success
- io.confluent.connect.sftp.sink.format.json.JsonFormat - Success
- io.confluent.connect.sftp.sink.format.parquet.ParquetFormat - Failed
- io.confluent.connect.sftp.sink.format.csv.CsvFormat - Success
- io.confluent.connect.sftp.sink.format.tsv.TsvFormat - Success
When using Parquet got this error
[2023-08-23 09:14:13,316] ERROR Error while closing the writer: stream already closed (io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter)
[2023-08-23 09:14:13,316] ERROR Exception on topic partition Topic-0: (io.confluent.connect.sftp.sink.SftpSinkTask)
org.apache.kafka.connect.errors.RetriableException: Record commit failed
at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:108)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:484)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:465)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:176)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:153)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: stream already closed
at com.jcraft.jsch.ChannelSftp$1.write(ChannelSftp.java:800)
at io.confluent.connect.sftp.sink.storage.SftpOutputStream.write(SftpOutputStream.java:114)
at io.confluent.connect.sftp.sink.format.parquet.ParquetOutputFile$1$2.write(ParquetOutputFile.java:82)
at java.base/java.io.OutputStream.write(OutputStream.java:122)
at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:620)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:241)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:319)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:105)
... 16 more
I tried to increase flush.size and got new error
[2023-08-23 16:30:25,723] ERROR Error while closing the writer: can not write PageHeader(type:DICTIONARY_PAGE, uncompressed_page_size:20, compressed_page_size:22, crc:-1202185600, dictionary_page_header:DictionaryPageHeader(num_values:5, encoding:PLAIN_DICTIONARY)) (io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter)
[2023-08-23 16:30:25,723] ERROR Exception on topic partition BAY_DBZSRC_Users-0: (io.confluent.connect.sftp.sink.SftpSinkTask)
org.apache.kafka.connect.errors.RetriableException: Record commit failed
at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:108)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:484)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:465)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:176)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:153)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: can not write PageHeader(type:DICTIONARY_PAGE, uncompressed_page_size:20, compressed_page_size:22, crc:-1202185600, dictionary_page_header:DictionaryPageHeader(num_values:5, encoding:PLAIN_DICTIONARY))
at org.apache.parquet.format.Util.write(Util.java:233)
at org.apache.parquet.format.Util.writePageHeader(Util.java:74)
at org.apache.parquet.format.converter.ParquetMetadataConverter.writeDictionaryPageHeader(ParquetMetadataConverter.java:1532)
at org.apache.parquet.hadoop.ParquetFileWriter.writeDictionaryPage(ParquetFileWriter.java:403)
at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:612)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:241)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:319)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:105)
... 16 more
Caused by: shaded.parquet.org.apache.thrift.transport.TTransportException: java.io.IOException: stream already closed
Here is my configure
curl -X POST \
http://localhost:8013/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "SFTP_Sink",
"config": {
"connector.class": "io.confluent.connect.sftp.SftpSinkConnector",
"tasks.max": "1",
"topics": "Users",
"confluent.topic.bootstrap.servers": "broker:9092",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "120000",
"path.format": "'\''year_month'\''=YYYY_MM/'\''day_hour_min'\''=dd_hh_mm",
"locale": "en-US",
"timezone": "Asia/Bangkok",
"flush.size": "15",
"rotate.interval.ms": "10000",
"format.class": "io.confluent.connect.sftp.sink.format.parquet.ParquetFormat",
"storage.class": "io.confluent.connect.sftp.sink.storage.SftpSinkStorage",
"sftp.host": "vm1",
"sftp.port": "22",
"sftp.username": "sftpuser",
"sftp.password": "psswd",
"sftp.working.dir": "/home/sftpuser/parquet"
}
}' | jq .