I am trying to build an application that will satisfy following need.
Let’s assume that I am getting certain price values for a given item(key) on an input topic.
I have two different conditions to check -
if the average price value for and item is less than x for n seconds I want to generate a low price alert for the item.
If average price value of an item is above y for m seconds I want to generate a high price alert for the item.
Can I define this using single streams application? Or the only way is to define multiple typologies or applications?
Also, I want to interrupt the aggregation for a given item based on some external event - how to indicate to streams that the processing of a given item is no longer needed based on the event? Thanks in advance.
Hi @send2r and welcome!
Two options come to my mind:
- You can split your stream into multiple branches, for example:
final KStream<String, String> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
stream.print(Printed.<String, String>toSysOut().withLabel("Branch 1:"));
stream.print(Printed.<String, String>toSysOut().withLabel("Branch 2:"));
This code will print each record twice, once prefixed with Branch 1:
and once prefixed with Branch 2:
. Instead of the print
operation you could have your windowed aggregations and a filter after the aggregations could decide if an alert is issued or not.
- You could use a
KStream#transform()
orKStream#transformValues()
which allows you to maintain a state store and to issue records when your conditions are satisfied.
Regarding the interruption, if the external events can be read from a Kafka topic you could read these events with KStream#transform()
or KStream#transformValues()
(maybe KStream#process()
would also work) and set a flag in a state store that can be read from another KStream#transformValues()
operation that also accesses the same state store.
You could also consider enriching your aggregated event with the flag to avoid the state store.
Best,
Bruno
Thank you very much for the suggestions. I did not quite follow the last part regarding suspending/stopping streams processing based on an external event though. Is there an example somewhere that you could point me to?
Also is it safe to assume that I can scale this topology by increasing partitions of my input topic -? the state is maintained even when some re-balancing occurs?
I am not aware of an example. You can find Streams examples under GitHub - confluentinc/kafka-streams-examples: Demo applications and code examples for Apache Kafka's Streams API.
In Streams, you can share a state store across multiple operations. See the javadocs for KStream#transform()
for how to connect a state store: KStream (kafka 2.8.1 API). Similar applies for KStream#process()
, KStream#transformValues()
, KStream#flatTransform()
, and KStream#flatTransformValues()
.
If your external events can be produced to a Kafka topic, you could read the external events from that topic and process them with one of the operation mentioned above. Let’s assume you use KStream#process()
. In the process()
you can set a flag in a state store each time you read an external event from the topic that triggers the interruption of the aggregation. If you connect that same state store to a – let’s say – KStream#transformValues()
that is put before the aggregation you can read the flag from the shared state store and only pass events to the aggregation if the flag is not set. That is just an idea, there are for sure other methods to achieve what you want.
Also is it safe to assume that I can scale this topology by increasing partitions of my input topic -? the state is maintained even when some re-balancing occurs?
Yes, it is safe.