Help with error while using amazon S3 sink connector

Hi,

I’ve been battling with this issue for a couple of hours now and for my effort I can’t seem to figure out where the issue lies.

I’m running Apache Kafka (bitnami/kafka:3.7.1’) Kafka Connect (confluentinc/cp-kafka-connect:7.7.1) and a schema registry (confluentinc/cp-schema-registry:7.7.1 all within docker .

I’m trying to sink json messages from my kafka cluster to parquet files in my s3 bucket.

Here’s the configuration for my s3 connector:

{
    "name": "s3-sink-connector",
    "config": {
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "tasks.max": "1",
      "topics": "scraped-products",     
      "s3.region": "af-south-1",                     
      "s3.bucket.name": "harvesthawk-data-lake",
      "s3.part.size": "5242880",
      "flush.size": "100",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
      "path.format": "year='YYYY'/month='MM'/day='dd'",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter.schemas.enable": false,
      "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
      "timestamp.extractor": "RecordField",
      "timestamp.field": "createdAt", 
      "locale": "en",
      "timezone": "Africa/Johannesburg",
      "schema.compatibility": "NONE",
      "partition.duration.ms": "86400000",
      "rotate.schedule.interval.ms": "3600000"
  }
}

and here’s the docker-compose sections for my Connect and Schema registry deployments:

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.7.1
    depends_on:
      kafka:
        condition: service_healthy
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "kafka-connect"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_PLUGIN_PATH: "/usr/share/java, /usr/share/local-connectors"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      AWS_ACCESS_KEY_ID: "${AWS_ACCESS_KEY_ID}"
      AWS_SECRET_ACCESS_KEY: "${AWS_SECRET_ACCESS_KEY}"
    ports:
      - "8083:8083"
    volumes:
      - ./kafka_connect:/usr/share/local-connectors
    networks:
      - harvest_hawk_network
  kafka-schema-registry:
    image: confluentinc/cp-schema-registry:7.7.1
    container_name: schema-registry
    ports:
      - 8081:8081
    depends_on:
      - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry

when the connector attempts to start, the following error is raised:

[2024-11-01 12:09:08,381] ERROR Failed to start task s3-sink-connector-0 (org.apache.kafka.connect.runtime.Worker)
java.util.ServiceConfigurationError: io.confluent.kafka.schemaregistry.rules.RuleExecutor: io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor not a subtype
	at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593)
	at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244)
	at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
	at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
	at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.addRuleObjectsFromServiceLoader(AbstractKafkaSchemaSerDe.java:276)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.initRuleObjects(AbstractKafkaSchemaSerDe.java:244)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.configureClientProperties(AbstractKafkaSchemaSerDe.java:205)
	at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.configure(AbstractKafkaJsonSchemaSerializer.java:61)
	at io.confluent.connect.json.JsonSchemaConverter$Serializer.<init>(JsonSchemaConverter.java:167)
	at io.confluent.connect.json.JsonSchemaConverter.configure(JsonSchemaConverter.java:80)
	at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:395)
	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:634)
	at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:537)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1968)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$38(DistributedHerder.java:2018)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Any assistance would be appreciated :slight_smile: