Missing partitions in global stream thread

We have a problem when using a global store that looks like a bootstrap problem.

The data to be stored in the global store is coming from a Kafka topic, but we need to process (transform) it before storing.

As noted in [KAFKA-7663] Custom Processor supplied on addGlobalStore is not used when restoring state from topic - ASF JIRA this cannot be done directly:

  • read from topic → transform → store in global store (doesn’t work!)

Hence we introduced an intermediate topic:

  1. read from topic → transform → write to intermediate topic
  2. read from intermediate topic → store (unchanged) in global store

The problem we have now is that the intermediate topic is not there yet when the global store is initialized in the global stream thread. More concretely we see a

org.apache.kafka.streams.errors.StreamsException: There are no partitions available for topic <name of intermediate topic> when initializing global store <name of global store>

Is there any way to postpone this initialization of the global store to the moment after the intermediate topic (and its partitions) are created?

For the pattern you describe, you should create the topic manually before you start you program. In the end, it’s considered a user topic (cf: Managing Streams Application Topics | Confluent Documentation).

How do you create the topic atm?

1 Like

I tried both

  • kstream.to(…)
  • kstream.repartition(…)

At least in other cases kstream.repartition() results in the automatic creation of the repartition topic. Here, it just seems to be created too late, i.e. after the global stream thread already tries to find partition info about it. So the application is shutting down before the topic is created.

To create the topic manually is exactly what we try to avoid. We see it more as an internal topic as it is just serving as the necessary intermediate topic between the transformation and the global store, similar to all these changelog and (other) repartition topics.

The “issue” is that StreamsBuilder#globalTable() assumes the topic exists on startup – it’s considered a user topic and must be created upfront.

If the topic is created, you can use KStream.to() to write into it. Using KStream#repartition() would create the topic but as you correctly observed, it would be create only later (in particular when the first rebalance happens).

Feel free to file feature request, but I am frankly not sure atm how it could be addressed given the current setup/design of Kafka Streams.

1 Like

Thank you Matthias for your answers, if there’s no other way we’ll create the topic upfront.

We won’t file a feature request, in the end the issue popped up because we have to have such an intermediate topic because of the mentioned KAFKA-7663.

1 Like