Hi,
According to this, the S3 Sink Connector is supposed to be able to support cloud storage providers that have a compatible API. I’m running into an error when trying to use a GCS bucket with the S3 Sink Connector.
To do this, I’m using a GCS bucket with a HMAC Key. I’ve tested the GCS bucket using this HMAC Key and can confirm that using the AWS CLI, I’m able to put objects into the GCS bucket. So I’m certain that IAM permissions are correct and that the HMAC key is valid.
Below is representative of my connector configuration:
{
“topics”: “gcs-test-topic”,
“path.format”: “‘year’=YYYY/‘month’=MM/‘day’=dd/‘hour’=HH”,
“partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”,
“storage.class”: “io.confluent.connect.s3.storage.S3Storage”,
“connector.class”: “io.confluent.connect.s3.S3SinkConnector”,
“s3.bucket.name”: “my-gcs-bucket-test-s3-sink-example”,
“partition.duration.ms”: “300000”,
“s3.part.size”: “8388608”,
“aws.access.key.id”: “< HMAC access key id >”,
“errors.log.enable”: “true”,
“name”: “gcs-s3-sink-connector”,
“format.class”: “io.confluent.connect.s3.format.json.JsonFormat”,
“s3.region”: “auto”,
“store.url”: “https://storage.googleapis.com”,
“timezone”: “UTC”,
“tasks.max”: “3”,
“flush.size”: “6000”,
“locale”: “US”,
“rotate.interval.ms”: “240000”,
“timestamp.extractor”: “Record”,
“errors.log.include.messages”: “true”,
“rotate.schedule.interval.ms”: “240000”,
“errors.tolerance”: “all”,
“s3.compression.type”: “gzip”,
“aws.secret.access.key”: “< HMAC Secret Key >”
}
this is the error that I’m encountering:
org.apache.kafka.connect.errors.ConnectException: com.amazonaws.services.s3.model.AmazonS3Exception: Invalid argument. (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Invalid argument. (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null