How can I best dynamically read all events from all topics

I am creating a consumer component (.NET) for my Kafka cluster. I have to dynamically read all events from all topics (even when new ones are created). I have found a solution using the adminclient’s metadata and getting all topics every hour or so but I think there should be a better way to do this.
Does Confluent have something that covers this or is there a better way to do it?

Thanks in advance!

Hi there! I’d love to hear more about your use case and understand why you’d like to read all events from all topics.

Using the adminclient to capture metadata and subscribe dynamically from there would be your best bet, I think.

Hey! Thanks for your response :slight_smile:
My use case is finding out if we can use our Kafka streams to implement a central Audit trail component within our decentralized landscape.
At the moment audit-trails are not being saved by every component in our landscape. We do have a Kafka Cluster with zookeeper and schema registry. All components produce audit messages to our Kafka cluster. The messages should have a minimum amount of info to be a audit message (using avro schemas). Before saving the messages they should also be checked if they are correct. (Maybe using Kafka Streams?). And after they are checked they should be saved in a MongoDB (Kafka Connect?). So all components need to produce to our Kafka cluster and they all need to be checked and saved. Therefore the component that does the checking and saving (Kafka connect or new component) needs to consume/process all incoming messages from all topics (even when new ones are created).
What do you think is the best way to realize this?
Thanks in advance!

Hi, are there any updates?
I have used Kafka Connect with topics.regex to get all topics but my next problem is that this is not dynamic. So if new topics are created it will not subscribe to it.

The messages should have a minimum amount of info to be a audit message (using avro schemas). Before saving the messages they should also be checked if they are correct.

Could you tell me a bit more about what it is you mean by “checked”? Is this some specific business logic beyond the Avro schema validation?

Using the MongoDB Sink Connector is a great option for your pipeline so that you don’t reinvent the wheel. Unfortunately topics.regex is only evaluated when the connector starts up. In order for it to capture new topics, you would need to restart the connector regularly. Do you have that flexibility? You could restart the connector daily to get a new list of topics to read from.

To do your validation step, you could define a custom transformer on the sink connector, so that messages would flow through that transformer first before being sent to mongoDB.

@danicafine
The checking is indeed beyond Avro schema validation.
Right now I have the check finished using the Confluent filter. The topics.regex also works for retrieving all the topics, and luckily it restarts itself every couple minutes.
My next challenge is retrieving my Headers values to the MongoDB Sink connector.
The Kafka messages contain Kafka headers (Bytes array) and I want to save the values (decoded to JSON) to my MongoDB sink. Right now I have found a third party library that does this but the saved values from the headers are base64 strings. These should be saved decoded (as json). So is there a way to retrieve the Kafka Headers and save them (decoded) in my MongoDB sink?
Thanks in advance!

i m trying to read all the topics using s3 sink connector.
I m using topics.regex.

But i m facing 2 issues -
1 i m not able to map datasource name → {schema}/{table} using topics.dir
2 i m not able to filter few topics e.g. - heartbeat topic data, i don’t want heartbeat table records in my s3 bucket.

Can someone please help me in applying appropriate config changes.

This is the current config which I was trying, but its not working.

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "ap-south-1",
    "topics.dir": "0/single_sink/metis/${topic}",
    "flush.size": "10000",
    "tasks.max": "1",
    "s3.part.size": "67108864",
    "timezone": "Asia/Calcutta",
    "rotate.interval.ms": "60000",
    "locale": "en_GB",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "s3.bucket.name": "zeta-aws-aps1-metis-0-s3-pvt",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "partition.duration.ms": "86400000",
    "schema.compatibility": "NONE",
    "topics.regex": "cdc_0_metis_metis_0_pgdb.*",
    "parquet.codec": "gzip",
    "connect.meta.data": "true",
    "parquet.avro.write-old-list-structure": "false",
    "value.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "name": "cdc_0_metis_metis_0_pgdb.single_sink_zeta-aws-aps1-metis-0-s3-pvt_ap-south-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'date'=YYYY-MM-dd",
    "rotate.schedule.interval.ms": "180000",
    "timestamp.extractor": "RecordField",
    "key.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "timestamp.field": "cdc_source_ts_ms",
    "transforms": "Filter",
    "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms.Filter.predicate": "IsFoo",
    "transforms.Filter.negate": "true",
    "predicates": "IsFoo",
    "predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.IsFoo.pattern": "cdc_0_metis_metis_0_pgdb.metis.cdc_heartbeat,cdc_0_metis_metis_0_pgdb.metis.cdc_signal,cdc_0_metis_metis_0_pgdb.transactions"
}

@danicafine could you pls help.

I have created an issue here.

can anyone please help.

The topic name should be automatically appended after topics.dir. You don’t need to include some “template variable” in the string