Hello,
I try to configure SFTP Sink connector write data from Kafka topic to file type like AVRO, JSON and parquet.
My configure changes only format.class as following:
- io.confluent.connect.sftp.sink.format.avro.AvroFormat - Success
- io.confluent.connect.sftp.sink.format.json.JsonFormat - Success
- io.confluent.connect.sftp.sink.format.parquet.ParquetFormat - Failed
- io.confluent.connect.sftp.sink.format.csv.CsvFormat - Success
- io.confluent.connect.sftp.sink.format.tsv.TsvFormat - Success
How to configure SFTP sink connector to write data as Parquet format?
When configured format.class as io.confluent.connect.sftp.sink.format.parquet.ParquetFormat and expected output file as Parquet then got an error as following:
ERROR [TEST_SFTP_SINK|task-0] Error closing writer for TEST_DATA-0. Error: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK (io.confluent.connect.sftp.sink.SftpSinkTask:199)
ERROR [TEST_SFTP_SINK |task-0] WorkerSinkTask{id= TEST_SFTP_SINK-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:221)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:625)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:337)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:213)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:268)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:165)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:122)
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53)
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
at io.confluent.connect.sftp.sink.format.parquet.ParquetRecordWriter.commit(ParquetRecordWriter.java:105)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:484)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:465)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.executeState(TopicPartitionWriter.java:176)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.write(TopicPartitionWriter.java:158)
at io.confluent.connect.sftp.sink.SftpSinkTask.put(SftpSinkTask.java:153)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:590)
Thanks
May