Record Headers in Stream Aggregate

Records to a topic is produced with a header which helps in identifying the type of message and is useful in some business processing.

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, msg);
	producerRecord.headers().add("headerkey", type.getBytes(StandardCharsets.UTF_8));
	producer.send(producerRecord);

With help of Processor API’s I am able to read the headers, but my application has streams which does the aggregation and other processing.

streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()))
				.process(() -> new Processor(), "stateStore"); // Able to read the header inside Processor class.

Is there a way that we could read the record headers in stream.aggregate() function of Kafka.

The requirement is to read the headers for further processing

stream.aggregate(String::new, (key, value, aggregated) -> aggregateValues(value, aggregated),
             Materialized.with(Serdes.String(), Serdes.String())) // **Header value is required here**

Hi @ArthanarisamyA ,

There’s no way to read headers in the DSL aggregate method. I suggest putting your aggregation into a custom processor or using process to map your record to a new object containing the headers. With the latter approach, if I understood your requirements correctly, you could use a filter to drop records you don’t want to aggregate.

HTH,
Bill

1 Like

Hi @bbejeck,

Thanks for your time and reply and yes, your understanding is right. There could possibly be other records which may need not to participate in consolidation and those records needs to be filtered out.

Considering your suggestion to use custom processor for aggregation, would you please help me to understand better. My understanding is that the Custom Processor process(Record<String, String> record) method gives only the current record for processing.
Where as aggregate(key, currVal, aggregatedVal) method provides us, key, current record and previous aggregated value once it is grouped by key.

Hi @ArthanarisamyA ,

Great question - here are the steps you’ll need to take:

  1. Add a state store to the topology and processor,
  2. Do the lookup from the store by key and perform the aggregation with the current value
  3. Put the updated aggregation back into the store.

This is more or less the same procedure used by the DSL.
-Bill

Thanks @bbejeck

I have been thinking of the same approach just that I was curious to know if there are any other better alternatives to handle it.