S3 Sink Connector writes only 3 messages for each file

S3 bucket has > 500K files and it’s hard to read them with Spark. How can I increase message count per partition? Below my config:

"config": {
    "connector.class": "S3SinkConnector",
    "topics": "xxxxxxxxxx",
    "s3.bucket.name": "xxxxxxxxxxxx",
    "s3.part.size":"26214400",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "flush.size": "10000",
    "s3.region": "xxxxxxxx",
    "tasks.max": "3",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "locale": "en-US",
    "timezone": "UTC",
    "rotate.schedule.interval.ms": "3600000",
    "rotate.interval.ms": "60000",
    "path.format": "YYYY-MM-dd",
    "timestamp.extractor": "Record",
    "transforms": "insertTS,formatTS, ReplaceField",
    "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.insertTS.timestamp.field": "messageTS",
    "transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss:SSS",
    "transforms.formatTS.field": "messageTS",
    "transforms.formatTS.target.type": "string",
    "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.ReplaceField.blacklist": "xxxxxxxxxxxxxx"
  }

You’re creating a new file every minute (60000 ms = 60 seconds = 1 minute). That’s why you’ve got so many files.

If I try to increase let say 5 min. I got error: java.lang.OutOfMemoryError: Java heap space

had to increase connector’s heap size. now working fine.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.