Hi,
I’ve been battling with this issue for a couple of hours now and for my effort I can’t seem to figure out where the issue lies.
I’m running Apache Kafka (bitnami/kafka:3.7.1’) Kafka Connect (confluentinc/cp-kafka-connect:7.7.1) and a schema registry (confluentinc/cp-schema-registry:7.7.1 all within docker .
I’m trying to sink json messages from my kafka cluster to parquet files in my s3 bucket.
Here’s the configuration for my s3 connector:
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "scraped-products",
"s3.region": "af-south-1",
"s3.bucket.name": "harvesthawk-data-lake",
"s3.part.size": "5242880",
"flush.size": "100",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
"path.format": "year='YYYY'/month='MM'/day='dd'",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": false,
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"timestamp.extractor": "RecordField",
"timestamp.field": "createdAt",
"locale": "en",
"timezone": "Africa/Johannesburg",
"schema.compatibility": "NONE",
"partition.duration.ms": "86400000",
"rotate.schedule.interval.ms": "3600000"
}
}
and here’s the docker-compose sections for my Connect and Schema registry deployments:
kafka-connect:
image: confluentinc/cp-kafka-connect:7.7.1
depends_on:
kafka:
condition: service_healthy
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "kafka-connect"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_PLUGIN_PATH: "/usr/share/java, /usr/share/local-connectors"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
AWS_ACCESS_KEY_ID: "${AWS_ACCESS_KEY_ID}"
AWS_SECRET_ACCESS_KEY: "${AWS_SECRET_ACCESS_KEY}"
ports:
- "8083:8083"
volumes:
- ./kafka_connect:/usr/share/local-connectors
networks:
- harvest_hawk_network
kafka-schema-registry:
image: confluentinc/cp-schema-registry:7.7.1
container_name: schema-registry
ports:
- 8081:8081
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
when the connector attempts to start, the following error is raised:
[2024-11-01 12:09:08,381] ERROR Failed to start task s3-sink-connector-0 (org.apache.kafka.connect.runtime.Worker)
java.util.ServiceConfigurationError: io.confluent.kafka.schemaregistry.rules.RuleExecutor: io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor not a subtype
at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.addRuleObjectsFromServiceLoader(AbstractKafkaSchemaSerDe.java:276)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.initRuleObjects(AbstractKafkaSchemaSerDe.java:244)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.configureClientProperties(AbstractKafkaSchemaSerDe.java:205)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.configure(AbstractKafkaJsonSchemaSerializer.java:61)
at io.confluent.connect.json.JsonSchemaConverter$Serializer.<init>(JsonSchemaConverter.java:167)
at io.confluent.connect.json.JsonSchemaConverter.configure(JsonSchemaConverter.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:395)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:634)
at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:537)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1968)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$38(DistributedHerder.java:2018)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Any assistance would be appreciated