Hi everyone.
I’m working with Kafka S3 Sink Connector. But the partitioner provided by default didn’t meet up our requirements. Therefore, I have to write a custom partitioner.
After pulling and building a custom partitioner from github: Kafka Connect Field and Time Based Partitioner , I copied the jar file to location /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib
, restarted the confluent local services, and registered the s3-sink-connector. But it returned {"error_code": 500, "message": null}
. More detail about the problem:
-
connect
's log:
[2022-01-14 15:52:40,130] ERROR Uncaught exception in REST call to /connectors/ (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:304)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:249)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:644)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:652)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:560)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:543)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:144)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:465)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:365)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
- Connector registration request
POST http://127.0.0.1:8083/connectors/
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "10",
"topics": "xxxxxxxxx",
"s3.region": "xxxxxxxx",
"s3.bucket.name": "xxxxxxxxx",
"s3.part.size": "5242880",
"flush.size": "10",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE",
"name": "s3-sink",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081/",
"partitioner.class": "com.canelmas.kafka.connect.FieldAndTimeBasedPartitioner",
"partition.field.name":"ecomPlatform,customerId",
"partition.duration.ms" : 86400000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale" : "US",
"timezone" : "UTC"
}
}
- Platform: Confluent Community for Ubuntu 20.04
I really appreciate your support.