Hi, this is my first post here, so I hope everything is according to the etiquette I also tried to ask the same question on stackoverflow (apache kafka streams - How to perform windowed aggregations on top of windowed aggregations - Stack Overflow) but unfortunately could not get feedback over there
Tl;dr:
- Why is there no analogue of no analogue of TimeWindowedKStream for KTable?
- How can I do a windowed aggregation of a KTable with the current API?
- a) Map
Windowed<MyKey>
to new Timescale and treat this as normal aggregation - b) Convert Ktable to Stream and do the state management manually
- c) Something else
- d) Please don’t
- a) Map
We have a infinite retention topic in Kafka with “financial transactions” which is a stream of events that contain a BigDecimal
revenue value and a few metadata. To speed up our reporting we would like to pre-aggregate these revenues on different window durations: Hour, Day, Month, and Year.
The calculation of the events is pretty standard:
.groupBy((key, value) -> new MyKey(value.getA(), value.getB()),
Grouped.with(myKeySerde, financialTransactionSerde))
.windowedBy(TimeWindows.of(WINDOW_DURATION).grace(GRACE_PERIOD))
.aggregate(BigDecimalSummaryStatistics::new,
(aggKey, newValue, aggValue) -> {
aggValue.accept(newValue.getRevenue());
return aggValue;
},
Materialized.<MyKey, BigDecimalSummaryStatistics, WindowStore<Bytes, byte[]>>
with(myKeySerde, revenueSerde)
.withRetention(RETENTION_PERIOD))
BigDecimalSummaryStatistics
is a custom class in analogy to java.util.DoubleSummaryStatistics
because we are also interested in min
, max
and count
. The name for the key is highly abbreviated for readability.
(Comments on the code are always appreciated.)
The output is obviously a KTable
which we write into a compacted topic. This “hourly-revenue” topic is
Produced.with(new WindowedSerdes.TimeWindowedSerde<>(myKeySerde), revenueSerde)
Now to the fun part: I would like to perform the aggregations for coarser timescales on the output of the hourly aggregation. This has several reasons:
- Workload: If the topic is already compacted or we decide to use suppression, the number of events to process are far fewer
- Schema: The source topic is quite messed up (think multiple schemas and no schema registry), outside our control to modify and too large to persist in a “clean” topic. Currently we deserialize that as
byte[]
and have a huge and costlyTransformer
at the beginning of the topology that unifies the schema in-place (Feedback apprciated). So it would be a huge advantage to only go through that mess once.
However, for Ktable
there is no analogue of [TimeWindowedKStream][1]. The graphic below is taken from the developer guide and illustrates this point rather explictly: KGroupedTable aggregations are
Always non-windowed
The fact, that it is not allowed this explicitly makes me believe there is something I am overlooking, but I don’t see a conceptional problem. For windowed Aggregation I need to maintain a WindowStore
with all events that fall into a window. Couldn’t this be fed from a Ktable with an upsert instead of an insert just as well? Is there a reason why windowing on a KTable cannot work?
And finally: How can I work around this? I see two ways:
- map
Windowed<MyKey>
toWindowed<MyKey>
and replace thestartMs
by
windowed.window().startTime().atZone(GERMAN_TIME_ZONE_ID).truncatedTo(ChronoUnit.DAYS).toInstant().toEpochMilli()
In this case, I need to write a custom transformer to properly implement the grace period and I also have to use a `KeyValueStore` rather than a `WindowStore`. In essence, I need to manually to everything `windowedBy` would do under the hood. I already implemented this and it passes all integration tests, but it feels like it's not supposed to be done this way.
- convert the KTable back to KStream and use a windowed aggregation again. In this case I would need keep the state of the Kstream saved somewhere, maybe a HashMap inside the
aggregate function
to implement the semantics of an updatestream. I tried this before. It was slow, buggy and extremely hard to maintain but it worked. I find this solution terrible because the only reason KTable exists is so that I don’t do something like this.
Otherwise I could just bite the bullet and do the aggregation on the raw data directly. I described the drawbacks above already.
To summarize again:
- Why is there no analogue of no analogue of TimeWindowedKStream for KTable?
- How can I do a windowed aggregation of a KTable with the current API?
- a) Map
Windowed<MyKey>
to new Timescale and treat this as normal aggregation - b) Convert Ktable to Stream and do the state management manually
- c) Something else
- d) Please don’t
- a) Map
I would appreciate all feedback and ideas!