Average for each window

Hi all,
I’m new in kafka streams and I’m really going crazy. I have a stream of counter values represented by <counter-name, counter-value, timestamp>. I want to calculate the average value for each day, like this:

counterValues topic content:

"cpu", 10, "2022-06-03 17:00"
"cpu", 20, "2022-06-03 18:00"
"cpu", 30, "2022-06-04 10:00"
"memory", 40, "2022-06-04 10:00"

and i want to obtain this output:

"cpu", "2022-06-03", 15
"cpu", "2022-06-04", 30
"memory", "2022-06-04", 40

This is a snippet of my code that it doesn’t work (it seems to calculate count)…

		counterValueStream
			.groupByKey()
			.windowedBy(tumblingWindow)
			.aggregate(StatisticValue::new, (k, counterValue, statisticValue) -> {
	            statisticValue.setSamplesNumber(statisticValue.getSamplesNumber() + 1);
	            statisticValue.setSum(statisticValue.getSum() + counterValue.getValue());
	            return statisticValue;
	        }, Materialized.with(Serdes.String(), statisticValueSerde))
			.toStream()
	        .map((Windowed<String> key, StatisticValue sv) -> {
	            double avgNoFormat = sv.getSum() / (double) sv.getSamplesNumber();
	            double formattedAvg = Double.parseDouble(String.format("%.2f", avgNoFormat));
	            return new KeyValue<>(key.key(), formattedAvg) ;
	        })
	        .to("average", Produced.with(Serdes.String(), Serdes.Double()));

Can anyone help me, please?
Thanks in advance!

Please check out Compute an average aggregation using Kafka Streams

1 Like

Hi, I wrote the code starting from the project that you have mentioned…
The window is the following:

		Duration windowSize = Duration.ofDays(1);
		TimeWindows tumblingWindow = TimeWindows.of(windowSize);

The content of counterValue topic and the ouput are:

Note that I use a TimestampExtractor that use counter timestamp instead of kafka record.

I would like to have the following result:

“cpu”, “2022-06-03”, 15
“cpu”, “2022-06-04”, 30
“memory”, “2022-06-04”, 40

Maybe there is something I am not understanding about how windows work…

Thank you in advance!
Elena

Hi @miele975

There are a couple of things to consider here.

If you’re not getting the results you’d expect, I’d first look into your aggregation code as Kafka Streams only executes the Aggregator.apply method.

One suggestion is to try using the Topology Test Driver on your streams application. Using the TTD you have complete control over the timestamps for your input records and that makes it easy to spot where the application is going sideways. The aggregation tutorial you looked at contains a test as well you can use for guidance.

For the output, it seems like you want a final result for the day, i.e. when the window closes. Here’s an example of suppression from the Kafka Streams course on Confluent Developer. Keep in mind with suppress that Kafka Streams emits results when there are incoming records that drive the streamtime forward, in other words the final window results aren’t automatically emitted when the window closes.

Let me know if you have any questions.

HTH,
Bill

1 Like