Kafka connect S3SinkConnector gzipped TSV format

I need to sink kafka msgs to S3 via kafka connect but in “gzipped TSV” format.
Wondering what config to use in

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/s3-kafka-sink/config \
    -d '
 {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "tasks.max": "1",
        "topics": "snowplow_enriched_good",
        "s3.region": "eu-west-1",
        "s3.bucket.name": "snowplow-enrich",
        "flush.size": "65536",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "schema.compatibility": "NONE",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "transforms": "AddMetadata",
        "transforms.AddMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.AddMetadata.offset.field": "_offset",
        "transforms.AddMetadata.partition.field": "_partition"
    }
'

Copied above sample from demo-scene

You will want s3.compression.type=gzip, but StringFormat is only available for HDFS Sink, not the S3 sink, so you’d need to build the S3 source code on your own with this definition ported to that codebase.

Overall, I’d suggest JSONlines over TSV/CSV.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.