Hi everyone,
In our usecase it is not possible to have auto generation of topics by Kafka Streams.
It is required to create and delete the topics using a separate process.
I tried extracting a topic list from the topology itself, but for example the changelog topics are not included there.
Now my question is; can we generate a list manually with all topics that will be generated on the Kafka Cluster by Kafka Streams?
Or is it somehow possible to prescribe the topic names to know in advance what the topics would be that are required?
Thanks for your response!
You can check topology.describe()
which should include all internal topics. For example:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [in])
--> KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
--> KSTREAM-FILTER-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FILTER-0000000005 (stores: [])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-KEY-SELECT-0000000001
Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)
<-- KSTREAM-FILTER-0000000005
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])
--> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
--> KTABLE-TOSTREAM-0000000007
<-- KSTREAM-SOURCE-0000000006
Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
--> KSTREAM-KEY-SELECT-0000000008
<-- KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KTABLE-TOSTREAM-0000000007
Sink: KSTREAM-SINK-0000000009 (topic: out)
<-- KSTREAM-KEY-SELECT-0000000008
In the parenthesis, you find the input topics, output topics, repartition topics and the names of all stores. The repartition topics you need to prefix with <application.id>-
to get the full topic name. From the store names you get the changelog topics by prefixing them with <application.id>-
and adding suffix -changelog
. So for the above example, if application.id
is Test
you would get:
in
Test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
Test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
out
Note that you will have to redo this every time you change the topology. If at all possible, you should try to whitelist topics of the form <application.id>-.*-repartition
and <application.id>-.*-changelog
1 Like
In addition to the response from @lbrutschy - you can also provide the name for all of the internal topics Kafka Streams will generate. This way, you’ll get deterministic names that won’t change when adding or removing processors in your topology. Here’s a tutorial that describes how to implement naming in the topology.
Late response, but thank you both @bbejeck and @lbrutschy.
We resolved it by implementing an applicationId
for all relevant bindings using Spring Boot.
This is how we are able to run multiple Kafka Streams applications on a single cluster and still know for sure which topic belongs to which application.
Best regards