Filter topics based on topic name while using topics.regex for s3 sink connector

i want to write records for all of my topics for a source connector using single s3 sink connector.

i m able to achieve this using topics.regex

but i m facing 2 issues -
1 i m not able to map datasource name → {schema}/{table} using topics.dir
2 i m not able to filter few topics e.g. - heartbeat topic data, i dont want heartbeat table records in my s3 bucket.

could you pls help with the appropriate topic router and filter syntax

thanks.

current config of the connector – its not working as per requirements.

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "ap-south-1",
    "topics.dir": "0/single_sink/metis/${topic}",
    "flush.size": "10000",
    "tasks.max": "1",
    "s3.part.size": "67108864",
    "timezone": "Asia/Calcutta",
    "rotate.interval.ms": "60000",
    "locale": "en_GB",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "s3.bucket.name": "zeta-aws-aps1-metis-0-s3-pvt",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "partition.duration.ms": "86400000",
    "schema.compatibility": "NONE",
    "topics.regex": "cdc_0_metis_metis_0_pgdb.*",
    "parquet.codec": "gzip",
    "connect.meta.data": "true",
    "parquet.avro.write-old-list-structure": "false",
    "value.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "name": "cdc_0_metis_metis_0_pgdb.single_sink-aws-aps1-metis-0-s3-pvt_ap-south-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'date'=YYYY-MM-dd",
    "rotate.schedule.interval.ms": "180000",
    "timestamp.extractor": "RecordField",
    "key.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "timestamp.field": "cdc_source_ts_ms",
    "transforms": "filter",
    "transforms.filter.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.filter.regex": "cdc_0_metis_metis_0_pgdb.metis.cdc_heartbeat",
    "transforms.filter.replacement": "",
    "errors.tolerance": "all",
    "partition.field.name": "topic"
}

@OneCricketeer could you pls help. I saw some similar discussions over stackoverflow.

Regex router is for renaming, not filtering

Use a predicate for filtering

could you pls help to share one example. thanks.

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "ap-south-1",
    "topics.dir": "0/single_sink/metis/${topic}",
    "flush.size": "10000",
    "tasks.max": "1",
    "s3.part.size": "67108864",
    "timezone": "Asia/Calcutta",
    "rotate.interval.ms": "60000",
    "locale": "en_GB",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "s3.bucket.name": "zeta-aws-aps1-metis-0-s3-pvt",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "partition.duration.ms": "86400000",
    "schema.compatibility": "NONE",
    "topics.regex": "cdc_0_metis_metis_0_pgdb.*",
    "parquet.codec": "gzip",
    "connect.meta.data": "true",
    "parquet.avro.write-old-list-structure": "false",
    "value.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "name": "cdc_0_metis_metis_0_pgdb.single_sink-aws-aps1-metis-0-s3-pvt_ap-south-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'date'=YYYY-MM-dd",
    "rotate.schedule.interval.ms": "180000",
    "timestamp.extractor": "RecordField",
    "key.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "timestamp.field": "cdc_source_ts_ms",
    "partition.field.name": "topic",
    "transforms": "filter,predicate",
    "transforms.filter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms.predicate.type": "org.apache.kafka.connect.transforms.Predicate",
    "transforms.predicate.predicate": "if (record.topic() == 'heartbeat_topic') false else true",
    "transforms.filter.condition": "if (value == null) false else true"
}

i tried this but its not working.

predicates is it’s own config, not to be put in the transforms list

I don’t have an example because I’ve not done what you’re trying to do. I’ve also never used topics.regex and only used an explicit list of topics.

thanks man. its working now.

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "ap-south-1",
    "topics.dir": "0/single_sink/metis/${topic}",
    "flush.size": "10000",
    "tasks.max": "1",
    "s3.part.size": "67108864",
    "timezone": "Asia/Calcutta",
    "rotate.interval.ms": "60000",
    "locale": "en_GB",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "s3.bucket.name": "zeta-aws-aps1-metis-0-s3-pvt",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "partition.duration.ms": "86400000",
    "schema.compatibility": "NONE",
    "topics.regex": "cdc_0_metis_metis_0_pgdb.*",
    "parquet.codec": "gzip",
    "connect.meta.data": "true",
    "parquet.avro.write-old-list-structure": "false",
    "value.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "name": "cdc_0_metis_metis_0_pgdb.single_sink-aws-aps1-metis-0-s3-pvt_ap-south-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'date'=YYYY-MM-dd",
    "rotate.schedule.interval.ms": "180000",
    "timestamp.extractor": "RecordField",
    "key.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "timestamp.field": "cdc_source_ts_ms",
    "transforms": "Filter",
    "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms.Filter.predicate": "IsFoo",
    "transforms.Filter.negate": "true",
    "predicates": "IsFoo",
    "predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.IsFoo.pattern": "cdc_0_metis_metis_0_pgdb.metis.*,cdc_0_metis_metis_0_pgdb.transactions"
}

Its able to block the unwanted topics.

1 Like

could u pls help regarding the first requirement as well.

I want the files to be saved on a custom path – 0/single_sink/metis/{schema}/{table}
So I added it in topics.dir, but i am not able to get the files on the desired path.

“topics.dir”: “0/single_sink/metis/${topic}”,

using this its writing to literally this path → 0/single_sink/metis/${topic}

and ${topic} is not getting replaced with topicname.
my topicname consists of {connectorName}.{schema}.{table}

is there a way to get the {schema}.{table} from the topic name, and put it into topics.dir.
@OneCricketeer
thanks.

The S3 bucket path cannot be changed without forking the S3 sink code