I’ve two topics, configured with 1 partition (1A) and 5 partitions (5B).
I’m creating a state store from the data on 5 partition topic so that a stream on another sub topology (topics 1A) can use the state store to enrich its messages.
Doing this, I’ve ran into issues because of the sharding of the state stores. There are 5 shards of the state store, so when a transform is called from a single partition stream, it only has access to one of the shards on the state store due to stream threading etc.
So instead of repartitioning up the 1 partition topic (1A), I decided to repartition down the 5 partition topic to 1 partition topic (1B). To my surprise, it still seems to have some sharding going on in the transform on 1B’s topology.
some code:
builder.stream(5B).repartition(1).transformValues() <- stores values in a key value store
builder.stream(1A).transformValues() <- retrieves values to enrich but it's not getting all the keys, some are missing.
I assumed that because the topic was repartitioned from 5 to 1, that would result in only one shard being created in the state store because its being used after the repartition. Is there any documentation that describes how this works? Does the repartition not change the stream tasks/ state store sharding?
Is the parallelism/tasks based on the partition count of the topic that builder.stream(this-topic-here) originally uses and thus any stream processors after the source processor still use the same partition/thread/tasks/sharding despite repartitioning the topic?
Would I then need to make a fresh new source from the repartitioned topic? as in:
builder.stream(5B).repartition(1).to(1B)
builder.stream(1BTopic).transformValues()
builder.stream(1A).transformValues()
Would this be the same case if I used the processor api instead of the DSL?
Meaning, do topics have to be co-partitioned in order to use state stores in this manner with the processor api? Does the same state store sharding apply to the processor api?
Apologies for probably misusing some terms.
The describe
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [5PartitionTopic])
--> 5PartitionTopic-to-single-repartition-filter
Processor: 5PartitionTopic-to-single-repartition-filter (stores: [])
--> 5PartitionTopic-to-single-repartition-sink
<-- KSTREAM-SOURCE-0000000000
Sink: 5PartitionTopic-to-single-repartition-sink (topic: 5PartitionTopic-to-single-repartition)
<-- 5PartitionTopic-to-single-repartition-filter
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000005 (topics: [1PartitionTopicName])
--> KSTREAM-TRANSFORMVALUES-0000000006
Processor: KSTREAM-TRANSFORMVALUES-0000000006 (stores: [5PartitionTopic-state-store])
--> KSTREAM-FILTER-0000000007
<-- KSTREAM-SOURCE-0000000005
Processor: KSTREAM-FILTER-0000000007 (stores: [])
--> KSTREAM-KEY-SELECT-0000000008
<-- KSTREAM-TRANSFORMVALUES-0000000006
Source: 5PartitionTopic-to-single-repartition-source (topics: [5PartitionTopic-to-single-repartition])
--> KSTREAM-TRANSFORMVALUES-0000000004
Processor: KSTREAM-KEY-SELECT-0000000008 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KSTREAM-FILTER-0000000007
Sink: KSTREAM-SINK-0000000009 (topic: myOutputTopic)
<-- KSTREAM-KEY-SELECT-0000000008
Processor: KSTREAM-TRANSFORMVALUES-0000000004 (stores: [5PartitionTopic-state-store])
--> none
<-- 5PartitionTopics-to-single-repartition-source