S3 Sink Connector with a GCS Bucket

Hi,

According to this, the S3 Sink Connector is supposed to be able to support cloud storage providers that have a compatible API. I’m running into an error when trying to use a GCS bucket with the S3 Sink Connector.

To do this, I’m using a GCS bucket with a HMAC Key. I’ve tested the GCS bucket using this HMAC Key and can confirm that using the AWS CLI, I’m able to put objects into the GCS bucket. So I’m certain that IAM permissions are correct and that the HMAC key is valid.

Below is representative of my connector configuration:

{
“topics”: “gcs-test-topic”,
“path.format”: “‘year’=YYYY/‘month’=MM/‘day’=dd/‘hour’=HH”,
“partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”,
“storage.class”: “io.confluent.connect.s3.storage.S3Storage”,
“connector.class”: “io.confluent.connect.s3.S3SinkConnector”,
“s3.bucket.name”: “my-gcs-bucket-test-s3-sink-example”,
“partition.duration.ms”: “300000”,
“s3.part.size”: “8388608”,
“aws.access.key.id”: “< HMAC access key id >”,
“errors.log.enable”: “true”,
“name”: “gcs-s3-sink-connector”,
“format.class”: “io.confluent.connect.s3.format.json.JsonFormat”,
“s3.region”: “auto”,
“store.url”: “https://storage.googleapis.com”,
“timezone”: “UTC”,
“tasks.max”: “3”,
“flush.size”: “6000”,
“locale”: “US”,
“rotate.interval.ms”: “240000”,
“timestamp.extractor”: “Record”,
“errors.log.include.messages”: “true”,
“rotate.schedule.interval.ms”: “240000”,
“errors.tolerance”: “all”,
“s3.compression.type”: “gzip”,
“aws.secret.access.key”: “< HMAC Secret Key >”
}

this is the error that I’m encountering:

org.apache.kafka.connect.errors.ConnectException: com.amazonaws.services.s3.model.AmazonS3Exception: Invalid argument. (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Invalid argument. (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null

Before you go too far down the debugging road, how about using the GCS Sink Connector? Or is there a particular feature you need that is only in the S3 Sink Connector?

1 Like

The GCS Sink Connector requires a license that I don’t currently have access to, while the S3 Sink is open source.

I went down this rabbit hole and it looks like the connector is using a method from the AWS java sdk called doesBucketExistV2. You can see it in use here. From the error logs I can see that this is where my error is coming from. Apparently that method breaks interoperability with other S3 compatible APIS, according to this issue.

I’ve opened an issue on the github repo: connector is not interoperable with other s3-compliant cloud storage providers · Issue #747 · confluentinc/kafka-connect-storage-cloud · GitHub

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.