How KafkaStreams split stremed records

Hello frens,
when I have a Kafka Streaming job of messages with defined keys then the messages of same key are on same partitions. And all messages from same partition is is handled by single job, isn’t it? (single jar)

What if I change the key during the stream? Then the messages are repartitioned. When? The messages with new same keys are handled by same job?

sourceStream
                .selectKey((k, v) -> "MY_KEY") // here I add a key
                .transformValues(() -> new MyTransformer(MY_STORE), MY_STORE);

Assume source message has null keys and I run abvove job stream with 2 jars.
The source can be on many partitions but sink will be produced to one partition, won’t it?
2 jobs will be handling the job or 1 jar?

@programista4k Does the documentation on Kafka Streams Architecture help you?
https://docs.confluent.io/platform/current/streams/architecture.html

In particular the section on Parallelism helps describe how Kafka Streams operates across partitions.
https://docs.confluent.io/platform/current/streams/architecture.html#streams-architecture-parallelism-model

1 Like

Thx fren I’m too stupid to understand interpret this.

all messages from same partition is is handled by single job, isn’t it? (single jar)

Let’s use the term “Task” to describe what you are calling a job or “jar”.

Kafka Streams creates a fixed number of stream tasks based on the input stream partitions. Each task is assigned a list of partitions from the input streams. The assignment of the stream partitions to stream tasks never changes.

selectKey: Marks the stream for data re-partitioning: Applying a grouping or a join after selectKey will result in re-partitioning of the records. ( https://kafka.apache.org/28/documentation/streams/developer-guide/dsl-api.html)

I don’t fully understand your example, but I do not believe you are performing an operation that will result in an actual repartitioning of the records.

Regardless, I’m not sure it’s critical to understand which particular task processes which particular event. Stages in the topology that consume events for processing are assigned those partitions at the start. It is possible that a topology will not process a single event “all the way through”, some transformations which change the partitioning may result in the event being processed on another task.

1 Like

Auto-repartitioning only happens for “pure” DSL operators (and only on a “lazy” basis). This means, if you do a selectKey() it does not do the repartitioning but only “mark the stream” for repartitioning. Only if you use a key-sensitive operation (like an aggregation/join) on a stream that is marked for repartitioning, the repartitioning will happen. (As pointed out by @rick already.)

However, methods like transformValues do not trigger repartitioning, because they are not “pure” DSL operators but “PAPI integration” methods.

The JavaDocs actually document all this in more details on each method.

Last, you can inspect what topology is generated via:

Topology t = builder.build(); // or builder.build(props);
System.out.println(t.describe());

The topology description will show you if a repartition topic is inserted or not. If you want to repartition explicitly, you can use KStream#repartition() method.

2 Likes