Consider 2 input topics A
and B
with the same number of partitions in each. Topic A
gets data through a custom partitioner.
One stream will read data from A
and write to a persistentWindowStore
.
Another Stream will read data from B
, read a value based on the timestamp from the persistentWindowStore
and publish an event to the output topic.
Due to the custom partitioning in A
I tried to use selectKey
to get data repartitioned but it doesn’t seem to trigger a repartition
. This causes my logic to fail.
When I use repartition(Repartitioned.streamPartitioner(...))
it does the repartitioning and things work properly.
I wonder why the selectKey
doesn’t trigger a repartitioning in this case.
Code
val stateStore = "state-store"
val punctuatorStore = "punctuate-store"
val builder = StreamsBuilder()
builder.addStateStore(
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
stateStore,
Duration.ofDays(2),
Duration.ofMinutes(1),
false,
),
Serdes.String(),
serde.stateStoreSerde,
).withLoggingEnabled(changeLogConfigs()),
)
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(punctuatorStore),
Serdes.String(),
serde.stateStoreSerde,
),
)
builder.stream(
"A", // topic A
Consumed.with<String, B>(Topology.AutoOffsetReset.LATEST)
.withKeySerde(serde.string)
.withValueSerde(serde.serdeA),
).selectKey { _, b -> b.name }
.process(
// store data into the state store
ProcessorSupplier { StateStoreStorageProcessor(stateStore, serde.serdeA) },
Named.`as`("storage-flow"),
stateStore
)
builder.stream(
"B", // topic B
Consumed.with<String, A>(Topology.AutoOffsetReset.LATEST)
.withKeySerde(serde.string)
.withValueSerde(serde.serdeB),
).selectKey { _, a -> a.name }
.process(
// create the output with data from B and state store
ProcessorSupplier { OutputCreateProcessor(stateStore, serde.serdeA) },
Named.`as`("output-calculator"),
stateStore
).to(
"Output",
Produced.with(serde.string, serde.outputSerde)
)
builder.build()
Topology description
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000002 (topics: [B])
--> KSTREAM-KEY-SELECT-0000000003
Processor: KSTREAM-KEY-SELECT-0000000003 (stores: [])
--> output-calculator
<-- KSTREAM-SOURCE-0000000002
Source: KSTREAM-SOURCE-0000000000 (topics: [A])
--> KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
--> storage-flow
<-- KSTREAM-SOURCE-0000000000
Processor: output-calculator (stores: [state-store])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-KEY-SELECT-0000000003
Sink: KSTREAM-SINK-0000000004 (topic: Output)
<-- output-calculator
Processor: storage-flow (stores: [state-store])
--> none
<-- KSTREAM-KEY-SELECT-0000000001