Running multiple aggregations against a topic

I am trying to build an application that will satisfy following need.
Let’s assume that I am getting certain price values for a given item(key) on an input topic.
I have two different conditions to check -
if the average price value for and item is less than x for n seconds I want to generate a low price alert for the item.
If average price value of an item is above y for m seconds I want to generate a high price alert for the item.
Can I define this using single streams application? Or the only way is to define multiple typologies or applications?
Also, I want to interrupt the aggregation for a given item based on some external event - how to indicate to streams that the processing of a given item is no longer needed based on the event? Thanks in advance.

Hi @send2r and welcome!

Two options come to my mind:

  1. You can split your stream into multiple branches, for example:
final KStream<String, String> stream =, Consumed.with(Serdes.String(), Serdes.String()));
stream.print(Printed.<String, String>toSysOut().withLabel("Branch 1:"));
stream.print(Printed.<String, String>toSysOut().withLabel("Branch 2:"));

This code will print each record twice, once prefixed with Branch 1: and once prefixed with Branch 2:. Instead of the print operation you could have your windowed aggregations and a filter after the aggregations could decide if an alert is issued or not.

  1. You could use a KStream#transform() or KStream#transformValues() which allows you to maintain a state store and to issue records when your conditions are satisfied.

Regarding the interruption, if the external events can be read from a Kafka topic you could read these events with KStream#transform() or KStream#transformValues() (maybe KStream#process() would also work) and set a flag in a state store that can be read from another KStream#transformValues() operation that also accesses the same state store.
You could also consider enriching your aggregated event with the flag to avoid the state store.