No repartitioning with `selectKey`

Consider 2 input topics A and B with the same number of partitions in each. Topic A gets data through a custom partitioner.

One stream will read data from A and write to a persistentWindowStore.
Another Stream will read data from B, read a value based on the timestamp from the persistentWindowStore and publish an event to the output topic.

Due to the custom partitioning in A I tried to use selectKey to get data repartitioned but it doesn’t seem to trigger a repartition. This causes my logic to fail.

When I use repartition(Repartitioned.streamPartitioner(...)) it does the repartitioning and things work properly.

I wonder why the selectKey doesn’t trigger a repartitioning in this case.

Code

    val stateStore = "state-store"
    val punctuatorStore = "punctuate-store"

    val builder = StreamsBuilder()

    builder.addStateStore(
        Stores.windowStoreBuilder(
            Stores.persistentWindowStore(
                stateStore,
                Duration.ofDays(2),
                Duration.ofMinutes(1),
                false,
            ),
            Serdes.String(),
            serde.stateStoreSerde,
        ).withLoggingEnabled(changeLogConfigs()),
    )

    builder.addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(punctuatorStore),
            Serdes.String(),
            serde.stateStoreSerde,
        ),
    )

    builder.stream(
        "A", // topic A
        Consumed.with<String, B>(Topology.AutoOffsetReset.LATEST)
            .withKeySerde(serde.string)
            .withValueSerde(serde.serdeA),
    ).selectKey { _, b -> b.name }
    .process(
        // store data into the state store
        ProcessorSupplier { StateStoreStorageProcessor(stateStore, serde.serdeA) },
        Named.`as`("storage-flow"),
        stateStore
    )

    builder.stream(
        "B", // topic B
        Consumed.with<String, A>(Topology.AutoOffsetReset.LATEST)
            .withKeySerde(serde.string)
            .withValueSerde(serde.serdeB),
    ).selectKey { _, a -> a.name }
    .process(
        //  create the output with data from B and state store
        ProcessorSupplier { OutputCreateProcessor(stateStore, serde.serdeA) },
        Named.`as`("output-calculator"),
        stateStore
    ).to(
        "Output",
        Produced.with(serde.string, serde.outputSerde)
    )


    builder.build()

Topology description

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000002 (topics: [B])
      --> KSTREAM-KEY-SELECT-0000000003
    Processor: KSTREAM-KEY-SELECT-0000000003 (stores: [])
      --> output-calculator
      <-- KSTREAM-SOURCE-0000000002
    Source: KSTREAM-SOURCE-0000000000 (topics: [A])
      --> KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
      --> storage-flow
      <-- KSTREAM-SOURCE-0000000000
    Processor: output-calculator (stores: [state-store])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-KEY-SELECT-0000000003
    Sink: KSTREAM-SINK-0000000004 (topic: Output)
      <-- output-calculator
    Processor: storage-flow (stores: [state-store])
      --> none
      <-- KSTREAM-KEY-SELECT-0000000001

That’s by-design.

In general, selectKey() does not trigger a repartition directly, but it only marks the KStream for a potential downstream repartitioning to avoid unnecessary repartitioning. For example, you don’t want a repartition step if you do:

stream.selectKey().flatMapValue().to("outputTopic");

Ie, repartitioning is “lazy”. If a KStream is marked for repartitioning, a downstream DSL operator like groupByKey(), or join() would trigger the actual repartitioning, because they require data to be partitioned correctly.

However, PAPI-integration operators like process() or transformXxx() do not evaluate the “repartition flag” and don’t insert the repartition step. The semantics of the provided/custom Processor/Transformer are unknown and thus it might be unnecessary to repartition. Thus, it’s left to the user’s responsibility to repartition() expliclity if necessary for this case.

This behavior should actually be explained in the JavaDocs? If not, would you like to do a PR to improve the JavaDocs accordingly (or file a Jira about this so we don’t drop it on the floor)?

2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.