Able to write Parquet file by SFTP Sink connector?

Hello,

I try to configure SFTP Sink connector write data from Kafka topic to file type like AVRO, JSON and parquet.

My configure changes only format.class as following:

  • io.confluent.connect.sftp.sink.format.avro.AvroFormat - Success
  • io.confluent.connect.sftp.sink.format.json.JsonFormat - Success
  • io.confluent.connect.sftp.sink.format.parquet.ParquetFormat - Failed
  • io.confluent.connect.sftp.sink.format.csv.CsvFormat - Success
  • io.confluent.connect.sftp.sink.format.tsv.TsvFormat - Success

How to configure SFTP sink connector to write data as Parquet format?

When configured format.class as io.confluent.connect.sftp.sink.format.parquet.ParquetFormat and expected output file as Parquet then got an error as following:

ERROR [TEST_SFTP_SINK|task-0] Error closing writer for TEST_DATA-0. Error: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK (io.confluent.connect.sftp.sink.SftpSinkTask:199)

ERROR [TEST_SFTP_SINK |task-0] WorkerSinkTask{id= TEST_SFTP_SINK-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:221)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:625)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:337)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:213)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:268)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
        at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
        at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
        at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:165)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:122)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53)
        at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315)
        at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
        at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
        at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:105)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:484)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:465)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:176)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
        at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:153)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:590)

Thanks
May

I think this is a known error in recent versions of certain connectors.

You need to download snappy-java library JAR and place it on your connector worker classpath

https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java

1 Like

Thank you

For issue snappy I had fixed it by upgrade version from 3.1.15 to 3.1.16 that already bundled snappy.

But now I faced with new error like below. Have you ever faced that?

ERROR [TEST_SFTP|task-0] Error while closing the writer: stream already closed (io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter:107)
ERROR [TEST_SFTP |task-0] Exception on topic partition TEST_SFTP-0:  (io.confluent.connect.sftp.sink.SftpSinkTask:155)
org.apache.kafka.connect.errors.RetriableException: Record commit failed
        at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:108)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:484)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:465)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:176)
        at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
        at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:153)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:590)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:337)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:213)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:268)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: stream already closed

I’ve not seen this one, no.

The SFTP connector requires a license. If you have an enterprise contract with Confluent, I suggest opening a ticket with the support team so they can take a look

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.