Hello community.
I’ve browsed some of the topics related to Kafka Connect small files problem but so far I haven’t managed to solve the issue I’m having.
So we’re consuming messages from a Kafka topic that has data around 800kB per second. This topic is across multiple customers and so far the issue occurs only for one customer (the one with the highest data volume).
Our goal is to have an even amount of gzipped json on S3 and for most of the time this works just fine. However, during some hours of the day (00, 01, 02 and 03) the data “explodes” and we get hundreds of thousands of files varying from 100kb to 5MB.
Initially I thought that maybe it’s because of schema evolution (some tests are pushed by the customer to the topic during night time) so I tuned the compatibility settings for those. So I tried BACKWARDS_TRANSITIVE and FULL_TRANSITIVE options for it without any luck. Then I learned that this should not even be the case for JSON. This is the configuration we’re using:
{
“connector.class”: “io.confluent.connect.s3.S3SinkConnector”,
“enable.stdout”: “true”,
“errors.log.enable”: “true”,
“errors.log.include.messages”: “true”,
“flush.size”: “600000”,
“format.class”: “io.confluent.connect.s3.format.json.JsonFormat”,
“locale”: “US”,
“partition.duration.ms”: “3600000”,
“rotate.interval.ms”: “600000”,
“rotate.schedule.interval.ms” : “1800000”,
“partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”,
“path.format”: “‘year’=YYYY/‘month’=MM/‘day’=dd/‘hour’=HH”,
“s3.acl.canned”: “bucket-owner-full-control”,
“s3.bucket.name”: “raw-bucket”,
“s3.part.size”: “5242880”,
“s3.region”: “region”,
“s3.compression.type”: “gzip”,
“schema.generator.class”: “io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator”,
“schema.compatibility”: “FULL_TRANSITIVE”,
“storage.class”: “io.confluent.connect.s3.storage.S3Storage”,
“topics.dir”: “topics/test-events”,
“tasks.max”: “10”,
“timestamp.extractor”: “Record”,
“timezone”: “UTC”,
“security.protocol”: “SSL”,
“name”: “test_events”,
“topics”: “test-events”
}
So my understanding of the configuration is (not created by me) that:
- flush.size: determines the amount of records that can be written to a single file
- partition.duration.ms: as our granularity is one hour, 3600000 sounds correct
- rotate.interval.ms: so each file should be open for 10mins accepting records, but where the starting timestamp is based on the record timestamp (more on this after)
- rotate.schedule.interval.ms: same as above, but the starting timestamp is based on the system time
So I’m a bit confused here why there would be two different rotate intervals here? Would this cause any issues? I can see that the “timestamp.extractor” is set as “Record” (which is what we want) and since we’re getting around 350 messages per second, I would think that 10 minutes would be more than enough time to collect messages… To be honest, I’m clueless here and have really no idea what to try next…
Help appreciated, thank you!