Hi all,
I’m new in kafka streams and I’m really going crazy. I have a stream of counter values represented by <counter-name, counter-value, timestamp>. I want to calculate the average value for each day, like this:
counterValues topic content:
"cpu", 10, "2022-06-03 17:00"
"cpu", 20, "2022-06-03 18:00"
"cpu", 30, "2022-06-04 10:00"
"memory", 40, "2022-06-04 10:00"
and i want to obtain this output:
"cpu", "2022-06-03", 15
"cpu", "2022-06-04", 30
"memory", "2022-06-04", 40
This is a snippet of my code that it doesn’t work (it seems to calculate count)…
counterValueStream
.groupByKey()
.windowedBy(tumblingWindow)
.aggregate(StatisticValue::new, (k, counterValue, statisticValue) -> {
statisticValue.setSamplesNumber(statisticValue.getSamplesNumber() + 1);
statisticValue.setSum(statisticValue.getSum() + counterValue.getValue());
return statisticValue;
}, Materialized.with(Serdes.String(), statisticValueSerde))
.toStream()
.map((Windowed<String> key, StatisticValue sv) -> {
double avgNoFormat = sv.getSum() / (double) sv.getSamplesNumber();
double formattedAvg = Double.parseDouble(String.format("%.2f", avgNoFormat));
return new KeyValue<>(key.key(), formattedAvg) ;
})
.to("average", Produced.with(Serdes.String(), Serdes.Double()));
Can anyone help me, please?
Thanks in advance!