Using aggregate() with a class, new instance created each time. Doesn't aggregate at all

I am using Kotlin, trying to follow the example here (in Java).

private val distance = trips
.mapValues { _, trip ->
    objectMapper.readValue(trip, Trip::class.java)
}.peek { _, trip -> logger.info("Trip",trip) }
.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10)).grace(Duration.ofSeconds(0)))


private val aggregatedStreams = distance.aggregate(
    ::AggClass,
    { key, trip, agg -> agg.update(trip.startLat,trip.startLng,trip.endLat, trip.endLng)}, { key, oldAgg, updatedAgg ->
        oldAgg.merge(updatedAgg)
    },
    Materialized.`as`<String, AggClass, SessionStore<Bytes, ByteArray>>("stateStoreName")
        .withKeySerde(Serdes.String())
        .withValueSerde(tripSerDes())
        .withLoggingDisabled()).suppress(Suppressed.untilWindowCloses(unbounded()))

The issue is with the aggreate() function. I can see that the agg.update method is called and returns the correct values. However values in the aggregator are never updated. At the end the output from the aggregation is the same as the initial values for the class.

Have I made some mistake in the syntax? When using simple aggregators like Double it worked fine.

The aggregation class looks like this:


class AggClass {
    private var startLat: Double? = null
    private var endLat: Double? = null
    private var startLng: Double? = null
    private var endLng: Double? = null
    var totalDistance: Double? = null


    constructor() {}
    constructor(startLat: Double?, startLng: Double?, endLat: Double?, endLng: Double?, totalDistance: Double?) {
        this.startLat = startLat
        this.endLat = endLat
        this.startLng = startLng
        this.endLng = endLng
        this.totalDistance = totalDistance
    }

    fun update(lat: Double?, lng: Double?, totalDistance: Double?): AggClass {
        println(this)

        if (this.startLat == null && this.startLng == null) {
            this.startLat = lat
            this.startLng = lng
        }
        this.endLat = lat
        this.endLng = lng
       
        }
       // Removed some of the calculations here
        println("------")
        println(this)
        return this
    }


    fun merge(latest: AggClass): AggClass {
        return if (this.startLng == null) latest else AggClass(this.startLat, this.startLng, latest.endLat, latest.endLng, latest.totalDistance)

    }

Printing out the class from the update function shows the instance variables are being updated, Eg.
AggClass(startLat=2.424, endLat=2.424, startLng=12.325, endLng=12.325, totalDistance=5245.0)

However the final output is always
AggClass(startLat=null, endLat=null, startLng=null, endLng=null, totalDistance=null)

Not 100% sure.

However values in the aggregator are never updated. At the end the output from the aggregation is the same as the initial values for the class.

Can you provide an example (including timestamps)? – How do you access the result? I don’t see any downstream code after the aggregation.

Here is the code that sends the aggregation to the output topic.

KafkaStreamsUtil is just a helper function that calls val kafkaStreams = KafkaStreams(streamsBuilder.build(), properties) and 1kafkaStreams.start()` with some error handling.

        aggregatedStreams.toStream { windowedKey, _ -> windowedKey.key() }.filter { _, v ->
            v != null}.peek{ key, value -> logger.info("Key {} value {}", key, value.toString())}.to(outputTopic, Produced.with(Serdes.String(), Serdes.serdeFrom(TrpSerializer(),TrpDeserializer())))
        KafkaStreamsUtil(streamsBuilder,properties).start()

So to get the values of the instance variables when the agg.update() is called I’m literally outputting the objects toString(). And the results being sent to the output topic are outputted here key, value -> logger.info("Key {} value {}", key, value.toString())

Here’s some sample output.

Looking at this it seems the message to the output topic ( second last line, null values) is being sent before the agg.update() method is being called (outputted in the last line)

I have a little trouble to link the log lines to the code snippet you shared.

After reading the input, there is a first peek

.peek { _, trip → logger.info(“Trip”,trip)

but I don’t see any Trip log line. However, there is New message from Kafka log line, but it’s unclear where it comes from?

There is also a peek before you write into the output topic:

.peek{ key, value → logger.info(“Key {} value {}”, key, value.toString())}

This peek seems to write the other two log lines? So you should get 2 messages in he output topic?

It might also be helpful to log something when init(), aggregate(), and merged() is called.

Sorry for the lack of clarity.

but I don’t see any Trip log line. However, there is New message from Kafka log line, but it’s unclear where it comes from?

I had changed “Trip” to " New message from Kafka" in

.peek { _, trip → logger.info(“Trip”,trip)

And forgot to update the post to reflect this. I have changed it back to “Trip” now to avoid confusion.

This peek seems to write the other two log lines? So you should get 2 messages in he output topic?

.peek{ key, value → logger.info(“Key {} value {}”, key, value.toString())}

That peek wrote one of the lines, the other line comes from a a logger in the update method. I’ve labelled things a bit more clearly now.

I’ve added in the extra logging. Here is the output of two messages(same key)

Notice in message 2 that the constructor is being called (twice?). Specifically this constructor constructor() {}. A new instance appears to be created for each message, so no aggregation can occur. Perhaps I am using incorrect syntax for it.

The merge() method also seems to be called before update()in message 2, which seems incorrect.

In the above screenshots.:

INFO :: Trip

^ Indicates a message received.

Start agg.update() is the output of aggClass.ToString() when the method is called.

End agg.update() is the output of aggClass.ToString() at the end of the method.

agg.merge() again is the output of aggClass.ToString() when the merge() method is called.

Output to topic is what is sent to the output Kafka topic.

Thanks for the details. I think what you see is correct (even if If understand why it’s confusing). It’s a side effect of how the aggregation is implemented internally (cf kafka/KStreamSessionWindowAggregate.java at 765e588bdd29d9f971ad26c945a3436688be9f66 · apache/kafka · GitHub); the current implementation is used because it simplifies the runtime code.

Each new input record first creates it’s own (empty) session. If we find overlapping sessions, we merge them: This step is required, because an out-of-order record could actually connect two existing sessions. Assume a gap of 3 seconds and one session that ends at time 10, and a second session that starts at time 15. Both session are distinct because they are 5 seconds (larger and gap=3 sec) apart. An out-of-order record with timestamp 13, would connect both session though and they must be merged.

Only after existing sessions are merged, the new record is added.

Thus, a new record that is within the “gap” of an existing session, is not directly added to the existing session, but its “own session” (that is currently empty) is merged into the existing sessions first, and only after the existing session was extended, the record is added.

Thus, your code it totally fine, however, you should not rely on the order of calls to implement you business logic. It’s considered an implementation detail and might be changed at any time.

Does this make sense?

Thanks for the detailed reply.

I think I follow what you’re saying, but in that case the aggregation being sent to the output topic should be updating but it’s not.

Here’s a simple example with a Double accumulator of what I mean. SImply adds 2 to totalDistance for each new record received.

    private val aggregatedStreams =  distance.aggregate(
    { 0.0 }, 
    { _, trip, totalDistance:Double -> totalDistance + 2}, { _, leftAggValue, rightAggValue ->
            leftAggValue + rightAggValue
        },
        Materialized.`as`<String, Double, SessionStore<Bytes, ByteArray>>("stateStoreName")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
            .withLoggingDisabled())

As you can see here the value for totalDistance being sent to the output topic is increasing as expected with each record. This is the behaviour I am aiming for, to keep a running total.

That is not happening with the aggClass above. The aggregation doesn’t seem to be working. Even if I do a mock example, where the aggClass simply increments a variable each time, it doesn’t accumulate. If the variable is incremented by 1 each time, the value sent to the output topic is always one, regardless of how many records are sent within the window. Am I missing something here?

I am not 100% sure right now what the issue could be. Could it be related to passing around references instead of making a deep copy of the AggClass object?

Maybe you can set a breakpoint in KStreamSessionWindowAggregate.java to see what it is doing in detail?

From the code you can see, that after the aggregator is called, it’s result is put into the state store and it’s forwarded downstream: kafka/KStreamSessionWindowAggregate.java at 765e588bdd29d9f971ad26c945a3436688be9f66 · apache/kafka · GitHub

Apologies I forgot to reply to this.

It is all resolved and working now. The issue was in the deserializer used by the store. The wrong constructor for the AggClass was being called in the deserializer and so the values were incorrect.

Thanks for the help!

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.