S3SinkConnector: mismatch message count on S3

Hi, I’ve ~130M messages (280 partitions) on Kafka. Using below config writing them to S3. Seeing lag is ~100 for my consumer. when I check S3 it has ~30M messages.
what could be a problem. There is no error on logs.

“config”: {

"connector.class": "S3SinkConnector",

"topics": "xxxx",

"s3.bucket.name": "xxx",

"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",

"parquet.codec": "snappy",

"storage.class": "io.confluent.connect.s3.storage.S3Storage",

"flush.size": "2147483647",

"s3.region": "xxx,

"tasks.max": "3",

"partition.duration.ms": "3600000",

"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",

"locale": "en-US",

"timezone": "UTC",

"rotate.schedule.interval.ms": "3600000",

"rotate.interval.ms": "3600000",

"path.format": "YYYY-MM-dd",

"timestamp.extractor": "Record",

"transforms": "ReplaceField",

"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",

"transforms.ReplaceField.blacklist": "xxxxx"

}

How are you checking the files? You will have records distributed over multiple files, across multiple YYYY-mm-dd prefixes… And 3600000ms is not equal to one day for your partitioning scheme.

Note: You may not want both rotate.schedule.interval.ms and rotate.interval.ms.

I just setup for hour.
I’ve talked with the owner of topic. it seems they just delete messages but offset info still show 130M records. so false alarm!

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.