S3 Sink Connector - Small File Creation Issue

Hi Dear Fellows,

I hope this message finds you well. I’m Prashant. I’m reaching out to discuss an issue we’ve encountered while using AWS MSK Connect (AWS Kafka Connect version) in conjunction with the S3 sink connector for one of our use cases. The plan is to try this out and eventually move to Confluent Cloud in the next 3/4 months.

In this scenario, we are reading data from our self-managed Kafka using AWS MSK Connect and then sinking the data to S3. While we’ve been able to overcome multiple challenges along the way, we’re currently grappling with the “small file creation” issue.

We’ve configured the S3 sink connector with the following parameters, which should ideally result in fairly large files in the range of 80 to 100MB (which what we desire). This is essential from performance point-of-view because we are pushing a substantial amount of data, approximately 50 to 60K QPS (queries per second), to Kafka. However, despite our efforts to tweak these configurations in various ways, the files ingested by MSK Connect on S3 are consistently in the range of 2 to 3MB at most.

Kindly help.

Below are the S3 Sink connector configurations but if you need any other information, pls feel free to revert (prashant.chutke@gmail.com)

S3 Sink Connector Configurations

connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=40
topics=<topic_name>
topics.dir=data/

s3.compression.type=gzip
s3.compression.level=6
s3.region=us-west-2
s3.bucket.name=<bucket_name>

flush.size=5000000

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
locale=en
timezone=UTC

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
path.format=‘date’=YYYYMMdd/‘hour’=HH
partition.duration.ms=3600000

rotate.interval.ms=30000
timestamp.extractor=Record

transforms=insertTS,formatTS
transforms.insertTS.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertTS.timestamp.field=kafka_timestamp
transforms.formatTS.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.formatTS.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.formatTS.field=kafka_timestamp
transforms.formatTS.target.type=string

errors.tolerance=all
errors.deadletterqueue.topic.name=<dlq_topic_name>
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true
errors.retry.delay.max.ms=60000
errors.retry.timeout=300000

===========================================================