In our usecase it is not possible to have auto generation of topics by Kafka Streams.
It is required to create and delete the topics using a separate process.
I tried extracting a topic list from the topology itself, but for example the changelog topics are not included there.
Now my question is; can we generate a list manually with all topics that will be generated on the Kafka Cluster by Kafka Streams?
Or is it somehow possible to prescribe the topic names to know in advance what the topics would be that are required?
Thanks for your response!
You can check
topology.describe() which should include all internal topics. For example:
Source: KSTREAM-SOURCE-0000000000 (topics: [in])
Processor: KSTREAM-KEY-SELECT-0000000001 (stores: )
Processor: KSTREAM-FILTER-0000000005 (stores: )
Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)
Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
Processor: KTABLE-TOSTREAM-0000000007 (stores: )
Processor: KSTREAM-KEY-SELECT-0000000008 (stores: )
Sink: KSTREAM-SINK-0000000009 (topic: out)
In the parenthesis, you find the input topics, output topics, repartition topics and the names of all stores. The repartition topics you need to prefix with
<application.id>- to get the full topic name. From the store names you get the changelog topics by prefixing them with
<application.id>- and adding suffix
-changelog . So for the above example, if
Test you would get:
Note that you will have to redo this every time you change the topology. If at all possible, you should try to whitelist topics of the form
In addition to the response from @lbrutschy - you can also provide the name for all of the internal topics Kafka Streams will generate. This way, you’ll get deterministic names that won’t change when adding or removing processors in your topology. Here’s a tutorial that describes how to implement naming in the topology.
Late response, but thank you both @bbejeck and @lbrutschy.
We resolved it by implementing an
applicationId for all relevant bindings using Spring Boot.
This is how we are able to run multiple Kafka Streams applications on a single cluster and still know for sure which topic belongs to which application.