I am using Kotlin, trying to follow the example here (in Java).
private val distance = trips
.mapValues { _, trip ->
objectMapper.readValue(trip, Trip::class.java)
}.peek { _, trip -> logger.info("Trip",trip) }
.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10)).grace(Duration.ofSeconds(0)))
private val aggregatedStreams = distance.aggregate(
::AggClass,
{ key, trip, agg -> agg.update(trip.startLat,trip.startLng,trip.endLat, trip.endLng)}, { key, oldAgg, updatedAgg ->
oldAgg.merge(updatedAgg)
},
Materialized.`as`<String, AggClass, SessionStore<Bytes, ByteArray>>("stateStoreName")
.withKeySerde(Serdes.String())
.withValueSerde(tripSerDes())
.withLoggingDisabled()).suppress(Suppressed.untilWindowCloses(unbounded()))
The issue is with the aggreate()
function. I can see that the agg.update
method is called and returns the correct values. However values in the aggregator are never updated. At the end the output from the aggregation is the same as the initial values for the class.
Have I made some mistake in the syntax? When using simple aggregators like Double
it worked fine.
The aggregation class looks like this:
class AggClass {
private var startLat: Double? = null
private var endLat: Double? = null
private var startLng: Double? = null
private var endLng: Double? = null
var totalDistance: Double? = null
constructor() {}
constructor(startLat: Double?, startLng: Double?, endLat: Double?, endLng: Double?, totalDistance: Double?) {
this.startLat = startLat
this.endLat = endLat
this.startLng = startLng
this.endLng = endLng
this.totalDistance = totalDistance
}
fun update(lat: Double?, lng: Double?, totalDistance: Double?): AggClass {
println(this)
if (this.startLat == null && this.startLng == null) {
this.startLat = lat
this.startLng = lng
}
this.endLat = lat
this.endLng = lng
}
// Removed some of the calculations here
println("------")
println(this)
return this
}
fun merge(latest: AggClass): AggClass {
return if (this.startLng == null) latest else AggClass(this.startLat, this.startLng, latest.endLat, latest.endLng, latest.totalDistance)
}
Printing out the class from the update
function shows the instance variables are being updated, Eg.
AggClass(startLat=2.424, endLat=2.424, startLng=12.325, endLng=12.325, totalDistance=5245.0)
However the final output is always
AggClass(startLat=null, endLat=null, startLng=null, endLng=null, totalDistance=null)