How to correctly repartition by value

We have records that are identified by their keys and that additionally have a groupId on the value (multiple keys may belong to the same group, records with a specific key always belong to the same group). We must ensure that all records of the same group are consumed by the same consumer, but without changing the key, as we need support for tombstone events per key, not per groupId.

So we want to forward records from the source topic to the target topic, thereby keeping both the keys and values, but repartition the records by the value’s groupId.

While this sounds trivial on a first thought, it seems not to be when thinking of the tombstone events. To publish a tombstone event to the same partition as the former record with the same key, I would either need to keep track of what key was published to which partition, or alternatively what key belongs to which group.
To do so, I thought of using a KeyValueStore from the ProcessorContext. However, the StreamPartitioner doesn’t get initialized with the ProcessorContext and thus has no access to the KeyValueStore. What would probably work is to

  • first transform the value to some temporary tuple consisting of the value itself and the target partition (as a ValueTransformer is initialized with the ProcessorContext and thus has access to the KeyValueStore),
  • then repartition the stream, and
  • finally transform the temporary value back to the original value.

Maybe it’s just me, but this feels overly complicated for this (standard) use case, so I have the feeling there must be a simpler and more natural approach. So what would be the right way to do such a repartitioning based on a value property?

Hi - IIUC, what you’ll want to do is use the KStream.repartition(Repartitioned repartitioned) method. The Repartitioned class has a withStreamPartitioner method where you supply a StreamPartitioner instance, and that should allow you to partition records by a field on the value easily.


Hi Bill, I’m afraid, the approach you described is the obvious one that works as long as I don’t need to publish a tombstone event to the same partition as a regular event.
Maybe an example makes it more clear:
A record {k,{groupId, …}} has a key (k) and a value ({groupId, …}), the value in turn has a property groupId among others. So I need to determine the target partitions of records as follow:
{1, {20, …}} → partition = hash(20)
{7, {41, …}} → partition = hash(41)
{4, {20, …}} → partition = hash(20)
so far so good, the records with key 1 and 4 are published to the same partition as the have the same groupId in the value. But now the tricky part:
{4, null} → partition = ???
the tombstone event has no value, hence no groupId, so I would need to keep track that previously a record with key 4 was published to partition = hash(20)

I hope the example makes it more clear.

Hi @pat - that’s right, I overlooked the tombstone part of your requirements. The only thing I can think of is that you’d need to implement some tracking of keys to previous values, which you already indicated.

The solution we ended up is still using a KeyValueStore from within a Transformer, but we had to adapt our approach, maybe this helps others too, so let me summarize:

1. approach (failed):
As stated in the initial question, our first approach was to just re-partition, thereby taking the state of a KeyValueStore into account. But this is not possible as StreamPartitioner is not initialized with the ProcessorContext and thus has no access to any KeyValueStores, which is needed to correctly handle tombstone events.

2. approach (failed):
Next, as stated in the initial question too, we wanted to

  1. transform the value by adding the value’s property (groupId) needed to determine the target partition,
  2. re-partition the stream based on the groupId from within the new value,
  3. transform the value back to the original value by removing the value’s property (groupId), and finally
  4. publish to the target topic (keeping the partitioning).

However, that approach doesn’t work neither, as during publish the stream is always re-partitioned, either by the default partitioner (partitioning based on the key only) or by a custom partitioner (which - again - can partition only based on the key and/or value). So the re-partitioning done in step 2 was to no avail and we were back to square one. This is related to [KAFKA-10448] Preserve Source Partition in Kafka Streams from context - ASF JIRA : if we could just preserve the partitioning from step 2 in step 4, this approach would work. But right now, there’s no other way than to either change the key or the value. We’ve chosen to change the key such that we can still have a null value for tombstone events.

3. approach (succeeded):
In the end we do the following:

  1. transform the key (hence the record) by adding the value’s property (groupId) needed to determine the target partition (using the KeyValueStore in case of a tombstone event),
  2. publish to the target topic, thereby partitioning based on the groupId from within the new key.

With this approach we could implement the re-partitioning, but we had to give up keeping key and value unchanged.