Getting below exception in kafka s3 sink connector

Getting the below exception using s3 sink connector, due to the nature of our data we can not specify the schema of JSON.
hence changed value.converter.schemas.enable=false but still getting below exception

*exception
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke “org.apache.kafka.connect.data.Schema.type()” because “schema” is null
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider.schemaHasArrayOfOptionalItems(ParquetRecordWriterProvider.java:142)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:90)
at io.confluent.connect.s3.format.S3RetriableRecordWriter.write(S3RetriableRecordWriter.java:51)
at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:114)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:594)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:329)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:269)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
… 10 more

config_json
Please assume the AWS-related values are configured correctly.
{
“name”: “s3-sink-connector”,
“config”: {
“connector.class”: “io.confluent.connect.s3.S3SinkConnector”,
“tasks.max”: “1”,
“topics”: “topicname”,
“s3.region”: “regin_name”,
“s3.bucket.name”: “bucket_name”,
“format.class”: “io.confluent.connect.s3.format.parquet.ParquetFormat”,
“schema.compatibility”: “BACKWARD”,
“parquet.codec”: “snappy”,
“flush.size”: “1000”,
“aws.access.key.id”: “sample_key”,
“aws.secret.access.key”: “sample_secret”,
“storage.class”: “io.confluent.connect.s3.storage.S3Storage”,
“key.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”
}
}

any help is appreciated thanks in advance,

This converter / format combination isn’t supported:

You must use the AvroConverter, ProtobufConverter, or JsonSchemaConverter with ParquetFormat for this connector. Attempting to use the JsonConverter (with or without schemas) results in a NullPointerException and a StackOverflowException.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.