Stream partitions and repartitioning topics for state store lookups

I’ve two topics, configured with 1 partition (1A) and 5 partitions (5B).
I’m creating a state store from the data on 5 partition topic so that a stream on another sub topology (topics 1A) can use the state store to enrich its messages.

Doing this, I’ve ran into issues because of the sharding of the state stores. There are 5 shards of the state store, so when a transform is called from a single partition stream, it only has access to one of the shards on the state store due to stream threading etc.

So instead of repartitioning up the 1 partition topic (1A), I decided to repartition down the 5 partition topic to 1 partition topic (1B). To my surprise, it still seems to have some sharding going on in the transform on 1B’s topology.

some code:

builder.stream(5B).repartition(1).transformValues()   <- stores values in a key value store

builder.stream(1A).transformValues() <- retrieves values to enrich but it's not getting all the keys, some are missing.

I assumed that because the topic was repartitioned from 5 to 1, that would result in only one shard being created in the state store because its being used after the repartition. Is there any documentation that describes how this works? Does the repartition not change the stream tasks/ state store sharding?

Is the parallelism/tasks based on the partition count of the topic that builder.stream(this-topic-here) originally uses and thus any stream processors after the source processor still use the same partition/thread/tasks/sharding despite repartitioning the topic?

Would I then need to make a fresh new source from the repartitioned topic? as in:

builder.stream(5B).repartition(1).to(1B)

builder.stream(1BTopic).transformValues() 

builder.stream(1A).transformValues()

Would this be the same case if I used the processor api instead of the DSL?
Meaning, do topics have to be co-partitioned in order to use state stores in this manner with the processor api? Does the same state store sharding apply to the processor api?

Apologies for probably misusing some terms.

The describe

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [5PartitionTopic])
      --> 5PartitionTopic-to-single-repartition-filter
    Processor: 5PartitionTopic-to-single-repartition-filter (stores: [])
      --> 5PartitionTopic-to-single-repartition-sink
      <-- KSTREAM-SOURCE-0000000000
    Sink: 5PartitionTopic-to-single-repartition-sink (topic: 5PartitionTopic-to-single-repartition)
      <-- 5PartitionTopic-to-single-repartition-filter

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000005 (topics: [1PartitionTopicName])
      --> KSTREAM-TRANSFORMVALUES-0000000006
    Processor: KSTREAM-TRANSFORMVALUES-0000000006 (stores: [5PartitionTopic-state-store])
      --> KSTREAM-FILTER-0000000007
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-FILTER-0000000007 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000008
      <-- KSTREAM-TRANSFORMVALUES-0000000006
    Source: 5PartitionTopic-to-single-repartition-source (topics: [5PartitionTopic-to-single-repartition])
      --> KSTREAM-TRANSFORMVALUES-0000000004
    Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
      --> KSTREAM-SINK-0000000009
      <-- KSTREAM-FILTER-0000000007
    Sink: KSTREAM-SINK-0000000009 (topic: myOutputTopic)
      <-- KSTREAM-KEY-SELECT-0000000008
    Processor: KSTREAM-TRANSFORMVALUES-0000000004 (stores: [5PartitionTopic-state-store])
      --> none
      <-- 5PartitionTopics-to-single-repartition-source

I assumed that because the topic was repartitioned from 5 to 1, that would result in only one shard being created in the state store because its being used after the repartition.

That sounds correct, ie, this should happen.

Is the parallelism/tasks based on the partition count of the topic that builder.stream(this-topic-here) originally uses and thus any stream processors after the source processor still use the same partition/thread/tasks/sharding despite repartitioning the topic?

No. The parallelism (ie, number of tasks) is computed per sub-topology, based on the number of partitions of the sub-topology’s source topics – if there is more than one source topic for a sub-topology, the max number of partitions over all source topics determines the number of tasks (and thus store shards).

Cf Streams Architecture | Confluent Documentation

In your case, sub-topology 1 has two source topics 1PartitionTopicName and 5PartitionTopic-to-single-repartition – if both have 1 partition, you should also get a single task and a single shard. Thus, overall you should have 6 tasks, 5 for sub-topology 0 named 0_0, 0_1, 0_2, 0_3, 0_4) and one for sub-topology 1 (names 1_0). The naming scheme is <sub-topology-id_partition-id>

Did you check how many task are created? This information is logged.

Would I then need to make a fresh new source from the repartitioned topic? as in:

No, as explained above.

Would this be the same case if I used the processor api instead of the DSL? Meaning, do topics have to be co-partitioned in order to use state stores in this manner with the processor api? Does the same state store sharding apply to the processor api?

Yes. At runtime level, we only know about a Topology and we don’t even know if the Topology was build using the PAPI or the DSL.

However, it seems something is wrong in your topology. You have

Processor: KSTREAM-TRANSFORMVALUES-0000000006 (stores: [5PartitionTopic-state-store])

and

KSTREAM-TRANSFORMVALUES-0000000004 (stores: [enriched2080-state-store])

Ie, two processor each with one store. If you want to compute a join, you need a single processor to be connected to both store. It seems your topology “wiring” might not be correct.

Oops, I was trying to scrub the describe to replace some of the names to give more context and forgot one. I edited it.

I assume those are .info logs that are turned on by default. I’ll check.

I figured out why it didn’t work. The single partition topic… as it turns out, had 5 partitions in our staging environment.

Last question, how can the functionality of a global ktable be done in a processor api? Aka how can a processor api take a state store and replicate the data across all shards?

1 Like

You can call Topology#addGlobalStore() (StreamsBuilder#globalKTable() does the same thing internally).

Note, that the Processor you pass into addGlobalStore() is not allowed to modify the data – it should only take the kv-pair and put it into the store without modification.

1 Like