Aggregations on Windowed KTables

I would like to do some further operations on a windowed KTable. To give some background, I have a topic with data in the form of: {clientId, txTimestamp, txAmount}. From this topic, I have created a stream, partitioned by clientId with the underlying topic timestamp equal to the txTimestamp event field. Starting from this stream, I want to aggregate the number of transactions per clientId in every 1 hour windows. This is done with something similar to the following:
CREATE TABLE transactions_per_client WITH (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId;

The aggregations work as expected and yield values similar to:

ClientId Transactions_per_client WindowStart WindowEnd
1 12 1 2
2 8 1 2
1 24 2 3
1 19 3 4

What I want to do now is further process this table to add a column that represents the difference in number of transactions per client between 2 adjacent windows for the same client. For the previous table, that would be something like this:

ClientId Transactions_per_client WindowStart WindowEnd Deviation
1 12 1 2 0
2 8 1 2 0
1 24 2 3 12
1 19 3 4 -5

What would be the best way to achieve this (either using kafka streams or ksql)? I tried to use the User Defined Aggregation functions to try to create this column but it cannot be applied to a KTable, only to a KStream.

Windowed-TABLE are kind of a “dead end” in ksqlDB atm, and also for Kafka Streams, you cannot really use the DSL to further process the data (it’s a know issue).

I would recommend to read the topic using the PAPI with a stateful processor to do the processing.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.