Hi everyone,
I’m making an S3 sink connector and what I’m trying to achieve is that every morning at 04:00 AM UTC I wanna that S3 sink connector dump data into a new folder ( start a new day)
Example:
The current date & time is: 06-09-2022 01:00 am UTC
Kafka S3 sink connector needs to dump data:
- prefix/topic/year=2022/month=06/day=09/*.json
The current date & time is: 06-09-2022 04:00 am UTC
Kafka S3 sink connector needs to dump data into a new folder ( new day )
- prefix/topic/year=2022/month=06/day=10/*.json
Using known timezone ID’s this is not possible to achieve. I found out that it is possible to use custom offset but when I tried to use it I’m getting bad request when trying to create S3 sink connector.
Here is S3 sink config:
curl --location --request POST 'localhost:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "sink-s3-testing",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "topic_name",
"s3.region": "us-east-1",
"s3.bucket.name": "my-bucket",
"aws.accessKeyId": "mock-key",
"aws.secretKey": "mock-key",
"data.format": "AVRO",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "schema_link",
"value.converter.schema.registry.url": "schema_link",
"flush.size": 1,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.proxy.url": "http://localstack:4572",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"locale": "en-US",
"timezone": "\"+24:00\"",
"timestamp.extractor": "Wallclock"
}
}'
error message:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value \"+24:00\" for configuration timezone: The datetime zone id '\"+24:00\"' is not recognised\nYou can also find the above list of errors at the endpoint
/connector-plugins/{connectorType}/config/validate"}%