Hi, I’m trying to use Kafka Connect to store files on S3 as Parquet files. The config I’m currently running is this:
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "eu-west-1",
"s3.bucket.name": "Bucket-Name-Here",
"topics.dir": "\b",
"flush.size": "1000000000",
"s3.part.size": "1073741823",
"schema.compatibility": "NONE",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"tasks.max": "6",
"topics": "Topic-Name-Here",
"store.url": "Store-Url",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
"partition.duration.ms": "3600000",
"path.format": "'\'date\''=YYYY-MM-dd/",
"locale": "sv_SE",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "180000",
"timestamp.extractor": "RecordField",
"timestamp.field": "timestamp",
"timezone": "Europe/Stockholm",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "gzip",
"headers.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"keys.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat"
However, it seems that it doesn’t matter what flush.size
or rotate.schedule.interval.ms
I set, the file is always around 5-8kb. I would like them to be at least a couple of Mb.
During my testing, I’ve sent around 300-1000 messages in less than 5 seconds. I’ve also made sure the messages are sent to kafka with the exact same timestamp (hardcoded).
Is there a option I’m missing or have I miss-configured something?