Kafka stream metrics counters

I’m collecting a series of metrics for a kafka stream application, the issue I have is I’d like a consolidated value for the meters of a specific name. To make this a little clearer, these metrics are presented as an array of n items with n being the number of threads configured for the Kafka Streams application. I must add that I can consolidate the values by adding the sum of the ‘FunctionCounter’ totals. I however don’t have a mechanism to trigger the ‘aggregator method’ when the meters I’m interested in are updated. The aggregator method is copied below.

private Double aggregateValues(String idName){
    return meterRegistry.getMeters().stream()
        .filter(meter -> meter.getId().getName().startsWith(idName))
        .filter(FunctionCounter.class::isInstance)
        .map(FunctionCounter.class::cast)
        .mapToDouble(FunctionCounter::count)
        .sum();
}

I created a configuration bean to attempt the same, no joy

@Configuration
@Component
@Slf4j
public class Metrics {

    @Bean
    public FunctionCounter getAggregateCounter(MeterRegistry registry) {
        List<FunctionCounter> counters = registry.getMeters().stream().
            filter(meter -> meter.getId().getName().
                startsWith("Output_Message_Count"))
            .filter( FunctionCounter.class::isInstance )
            .map(FunctionCounter.class::cast)
            .collect(Collectors.toList());

        FunctionCounter counter = FunctionCounter
                .builder("Combined_Output_Message_Count", counters, state -> state.stream().mapToDouble(FunctionCounter::count).sum())
                .description("a description of what this counter does")
                .tags("region", "test")
                .register(registry);
        return counter;
    }
}

An example of the raw data output from actuator/prometheus endpoint os listed below

# HELP kafka_stream_thread_task_created_total The total number of newly created tasks
# TYPE kafka_stream_thread_task_created_total counter
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-4",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-3",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-2",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-1",} 5.0

The end goal is to have a meter that consolidates the totals, in. the. Case of the raw data shown above, 20
There has been talk of this work being done in the following proposal but it hasn’t progressed beyond the initial proposal stage
The proposal can be viewed at. The following url :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-674%3A+Metric+Reporter+to+Aggregate+Metrics+in+Kafka+Streams