Custom partitioner for S3 Sink Connector

Hi everyone.

I’m working with Kafka S3 Sink Connector. But the partitioner provided by default didn’t meet up our requirements. Therefore, I have to write a custom partitioner.
After pulling and building a custom partitioner from github: Kafka Connect Field and Time Based Partitioner , I copied the jar file to location /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib, restarted the confluent local services, and registered the s3-sink-connector. But it returned {"error_code": 500, "message": null}. More detail about the problem:

  1. connect 's log:
[2022-01-14 15:52:40,130] ERROR Uncaught exception in REST call to /connectors/ (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.lang.NullPointerException
	at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:304)
	at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
	at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:249)
	at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:644)
	at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:652)
	at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:560)
	at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:543)
	at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
	at org.apache.kafka.connect.connector.Connector.validate(Connector.java:144)
	at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:465)
	at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:365)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
  1. Connector registration request
POST  http://127.0.0.1:8083/connectors/

{
    "name": "s3-sink",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "tasks.max": "10",
        "topics": "xxxxxxxxx",
        "s3.region": "xxxxxxxx",
        "s3.bucket.name": "xxxxxxxxx",
        "s3.part.size": "5242880",
        "flush.size": "10",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "schema.compatibility": "NONE",
        "name": "s3-sink",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://127.0.0.1:8081/",
        "partitioner.class": "com.canelmas.kafka.connect.FieldAndTimeBasedPartitioner",
        "partition.field.name":"ecomPlatform,customerId",
        "partition.duration.ms" : 86400000,
        "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
        "locale" : "US",
        "timezone" : "UTC"
    }
}
  1. Platform: Confluent Community for Ubuntu 20.04

I really appreciate your support.

You seem to be missing timestamp.field, which is used for partitioning on time

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.