Manually generate (internal) topic list based on topology

Hi everyone,

In our usecase it is not possible to have auto generation of topics by Kafka Streams.
It is required to create and delete the topics using a separate process.

I tried extracting a topic list from the topology itself, but for example the changelog topics are not included there.

Now my question is; can we generate a list manually with all topics that will be generated on the Kafka Cluster by Kafka Streams?

Or is it somehow possible to prescribe the topic names to know in advance what the topics would be that are required?

Thanks for your response!

You can check topology.describe() which should include all internal topics. For example:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [in])
      --> KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-KEY-SELECT-0000000001
    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)
      <-- KSTREAM-FILTER-0000000005

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> KTABLE-TOSTREAM-0000000007
      <-- KSTREAM-SOURCE-0000000006
    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000008
      <-- KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
      --> KSTREAM-SINK-0000000009
      <-- KTABLE-TOSTREAM-0000000007
    Sink: KSTREAM-SINK-0000000009 (topic: out)
      <-- KSTREAM-KEY-SELECT-0000000008

In the parenthesis, you find the input topics, output topics, repartition topics and the names of all stores. The repartition topics you need to prefix with <application.id>- to get the full topic name. From the store names you get the changelog topics by prefixing them with <application.id>- and adding suffix -changelog . So for the above example, if application.id is Test you would get:

in
Test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
Test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
out

Note that you will have to redo this every time you change the topology. If at all possible, you should try to whitelist topics of the form <application.id>-.*-repartition and <application.id>-.*-changelog

1 Like

In addition to the response from @lbrutschy - you can also provide the name for all of the internal topics Kafka Streams will generate. This way, you’ll get deterministic names that won’t change when adding or removing processors in your topology. Here’s a tutorial that describes how to implement naming in the topology.

Late response, but thank you both @bbejeck and @lbrutschy.
We resolved it by implementing an applicationId for all relevant bindings using Spring Boot.

This is how we are able to run multiple Kafka Streams applications on a single cluster and still know for sure which topic belongs to which application.

Best regards