i want to write records for all of my topics for a source connector using single s3 sink connector.
i m able to achieve this using topics.regex
but i m facing 2 issues -
1 i m not able to map datasource name → {schema}/{table} using topics.dir
2 i m not able to filter few topics e.g. - heartbeat topic data, i dont want heartbeat table records in my s3 bucket.
could you pls help with the appropriate topic router and filter syntax
thanks.
current config of the connector – its not working as per requirements.
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "ap-south-1",
"topics.dir": "0/single_sink/metis/${topic}",
"flush.size": "10000",
"tasks.max": "1",
"s3.part.size": "67108864",
"timezone": "Asia/Calcutta",
"rotate.interval.ms": "60000",
"locale": "en_GB",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"s3.bucket.name": "zeta-aws-aps1-metis-0-s3-pvt",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"partition.duration.ms": "86400000",
"schema.compatibility": "NONE",
"topics.regex": "cdc_0_metis_metis_0_pgdb.*",
"parquet.codec": "gzip",
"connect.meta.data": "true",
"parquet.avro.write-old-list-structure": "false",
"value.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "cdc_0_metis_metis_0_pgdb.single_sink-aws-aps1-metis-0-s3-pvt_ap-south-1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "'date'=YYYY-MM-dd",
"rotate.schedule.interval.ms": "180000",
"timestamp.extractor": "RecordField",
"key.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
"timestamp.field": "cdc_source_ts_ms",
"transforms": "filter",
"transforms.filter.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.filter.regex": "cdc_0_metis_metis_0_pgdb.metis.cdc_heartbeat",
"transforms.filter.replacement": "",
"errors.tolerance": "all",
"partition.field.name": "topic"
}