Getting the below exception using s3 sink connector, due to the nature of our data we can not specify the schema of JSON.
hence changed value.converter.schemas.enable=false but still getting below exception
*exception
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke “org.apache.kafka.connect.data.Schema.type()” because “schema” is null
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider.schemaHasArrayOfOptionalItems(ParquetRecordWriterProvider.java:142)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:90)
at io.confluent.connect.s3.format.S3RetriableRecordWriter.write(S3RetriableRecordWriter.java:51)
at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:114)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:594)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:329)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:269)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
… 10 more
config_json
Please assume the AWS-related values are configured correctly.
{
“name”: “s3-sink-connector”,
“config”: {
“connector.class”: “io.confluent.connect.s3.S3SinkConnector”,
“tasks.max”: “1”,
“topics”: “topicname”,
“s3.region”: “regin_name”,
“s3.bucket.name”: “bucket_name”,
“format.class”: “io.confluent.connect.s3.format.parquet.ParquetFormat”,
“schema.compatibility”: “BACKWARD”,
“parquet.codec”: “snappy”,
“flush.size”: “1000”,
“aws.access.key.id”: “sample_key”,
“aws.secret.access.key”: “sample_secret”,
“storage.class”: “io.confluent.connect.s3.storage.S3Storage”,
“key.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”
}
}
any help is appreciated thanks in advance,