Sum data in "cascading" windows

Hi, this is my first post here, so I hope everything is according to the etiquette :slight_smile: I also tried to ask the same question on stackoverflow (apache kafka streams - How to perform windowed aggregations on top of windowed aggregations - Stack Overflow) but unfortunately could not get feedback over there

Tl;dr:

  1. Why is there no analogue of no analogue of TimeWindowedKStream for KTable?
  2. How can I do a windowed aggregation of a KTable with the current API?
    • a) Map Windowed<MyKey> to new Timescale and treat this as normal aggregation
    • b) Convert Ktable to Stream and do the state management manually
    • c) Something else
    • d) Please don’t

We have a infinite retention topic in Kafka with “financial transactions” which is a stream of events that contain a BigDecimal revenue value and a few metadata. To speed up our reporting we would like to pre-aggregate these revenues on different window durations: Hour, Day, Month, and Year.

The calculation of the events is pretty standard:

  .groupBy((key, value) -> new MyKey(value.getA(), value.getB()),
                        Grouped.with(myKeySerde, financialTransactionSerde))
                .windowedBy(TimeWindows.of(WINDOW_DURATION).grace(GRACE_PERIOD))
                .aggregate(BigDecimalSummaryStatistics::new,
                        (aggKey, newValue, aggValue) -> {
                            aggValue.accept(newValue.getRevenue());
                            return aggValue;
                        },
                        Materialized.<MyKey, BigDecimalSummaryStatistics, WindowStore<Bytes, byte[]>>
                            with(myKeySerde, revenueSerde)
                       .withRetention(RETENTION_PERIOD))

BigDecimalSummaryStatistics is a custom class in analogy to java.util.DoubleSummaryStatistics because we are also interested in min, max and count. The name for the key is highly abbreviated for readability.

(Comments on the code are always appreciated.)

The output is obviously a KTable which we write into a compacted topic. This “hourly-revenue” topic is

            Produced.with(new WindowedSerdes.TimeWindowedSerde<>(myKeySerde), revenueSerde)

Now to the fun part: I would like to perform the aggregations for coarser timescales on the output of the hourly aggregation. This has several reasons:

  • Workload: If the topic is already compacted or we decide to use suppression, the number of events to process are far fewer
  • Schema: The source topic is quite messed up (think multiple schemas and no schema registry), outside our control to modify and too large to persist in a “clean” topic. Currently we deserialize that as byte[] and have a huge and costly Transformer at the beginning of the topology that unifies the schema in-place (Feedback apprciated). So it would be a huge advantage to only go through that mess once.

However, for Ktable there is no analogue of [TimeWindowedKStream][1]. The graphic below is taken from the developer guide and illustrates this point rather explictly: KGroupedTable aggregations are

Always non-windowed

The fact, that it is not allowed this explicitly makes me believe there is something I am overlooking, but I don’t see a conceptional problem. For windowed Aggregation I need to maintain a WindowStore with all events that fall into a window. Couldn’t this be fed from a Ktable with an upsert instead of an insert just as well? Is there a reason why windowing on a KTable cannot work?

And finally: How can I work around this? I see two ways:

  1. map Windowed<MyKey> to Windowed<MyKey> and replace the startMs by
windowed.window().startTime().atZone(GERMAN_TIME_ZONE_ID).truncatedTo(ChronoUnit.DAYS).toInstant().toEpochMilli()
In this case, I need to write a custom transformer to properly implement the grace period and I also have to use a `KeyValueStore` rather than a `WindowStore`. In essence, I need to manually to everything `windowedBy` would do under the hood. I already implemented this and it passes all integration tests, but it feels like it's not supposed to be done this way.
  1. convert the KTable back to KStream and use a windowed aggregation again. In this case I would need keep the state of the Kstream saved somewhere, maybe a HashMap inside the aggregate function to implement the semantics of an updatestream. I tried this before. It was slow, buggy and extremely hard to maintain but it worked. I find this solution terrible because the only reason KTable exists is so that I don’t do something like this.

Otherwise I could just bite the bullet and do the aggregation on the raw data directly. I described the drawbacks above already.


To summarize again:

  1. Why is there no analogue of no analogue of TimeWindowedKStream for KTable?
  2. How can I do a windowed aggregation of a KTable with the current API?
    • a) Map Windowed<MyKey> to new Timescale and treat this as normal aggregation
    • b) Convert Ktable to Stream and do the state management manually
    • c) Something else
    • d) Please don’t

I would appreciate all feedback and ideas!

Why is there no analogue of no analogue of TimeWindowedKStream for KTable?

I guess you refer to the “roll-up” of smaller time-windows into larger time-windows. It’a a gap in the API, and given the current structure of the code base not easy to add unfortunately. The problem is, that the “windowed KTable” is kinda hybrid between a stream and a table, but the KTable API only models the “table part” but was not designed to handle the “stream part”. We hope to address this eventually.

How can I do a windowed aggregation of a KTable with the current API?

The simplest thing to do might be to express multiple aggregations against the same input stream, using different time windows. Ie, instead of “rolling up”, just compute the different result in parallel from the base data.

Converting the KTable result into a KStream and trying to roll-up won’t work, because the KTable would emit updates, but a KStream does not understand update semantics… Maybe if you would use suppress() to avoid updates, a roll-up could work this way though.

1 Like

Thanks a lot for responding!

It’a a gap in the API, and given the current structure of the code base not easy to add unfortunately

That’s very good to know. Is there already a Jira Ticket that addresses these “gaps” in the API? I would be very interested in learning more about this.

KTable would emit updates, but a KStream does not understand update semantics

That’s clear. I would need to re-implement the update semantics inside the aggregate. This is precicely why I don’t want to do it. :smiley: I agree that suppress would work but the latency is to high for this usecase and if it were not I think batch processing is the better option.

In the meantime I used KTable.map to map Windowed<Key> to a new key with the date truncated to days, aggregated these with a simple groupby and implemented a custom timestamp extractor with a grace parameter that skips record if the timestamp is older than grace. This at least passes all my integration tests and understands update semantics as you would expect.

There is only a related ticket that I am aware of: https://issues.apache.org/jira/browse/KAFKA-6840 – Compare the backing KIP, that explains why it’s difficult…