Understanding Aggregate KTables Subtractor

Hi, I’m new with Kafka and try to use it more. I am trying to understand the Aggregator or more precisely the Subtractor on KTable. From what I have read in the documentation the subtractor and adder are both called (given it has already been initialized. So often if you see a adder like +1L and subtractor like -1L (as for a simple count) then you would end up with no change after all (if both are called). So what part am I missing here?

Below is the relevant part from the guide: https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating

When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.

They will be called for different keys :slight_smile: Let’s look at an example. Assume you have an input KTable as follows:

key | value
-----------
 A  | (a, 10)
 B  | (a, 20)
 C  | (b, 15)

You groupBy(value.first) and sum(value.second) and get a result table:

key | value
-----------
 a  | 30
 b  | 15

Now the input table gets updated: <B,(a,20)> changes to <B,(b,5)>. For this case, to update the second table, 20 needs to be subtracted for a, and 5 needs to be added for b resulting in:

key | value
-----------
 a  | 10
 b  | 20

Hence, the adder and subtractor are applies to different rows in the result table to compute the update.

1 Like

Thank you very much for your detailed explanation Matthias. I have a little follow up question.

In your last part of the example, assuming that instead of an update from <B,(a,20)> to <B,(b,5)> it changes to <B,(a,25)> would it be correct, that the subtractor then is never called? Or would it technically subtract 20 and add 25?

And lastly I tried to find the code for this part, could you tell me if this is the correct link? kafka/KTableAggregate.java at 77e3ca019efe38195ec28ddab87aed52e1c403c8 · apache/kafka · GitHub

Thanks a lot.

The “magic” happens in here: kafka/KTableRepartitionMap.java at 6ce69021fd1daa7bf39a9796152aff260b633b3a · apache/kafka · GitHub

The “map” processor is put in between the upstream and downstream table, ie, in particular upstreamKTable -> RepartitionMap -> repartition-topic -> downstreamAggregator -> downstreamKTable

The upstream KTable will send a Change record with the old and new value for the updated row, and the Change record is split it into two. Downstream, one of the records will invoke the adder, the other one will invoke the subtractor. So it’s always two records and even if the key does not change (as in your example), it’s still a subtraction and an addition operation that will execute one over the other.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.