Aggregate uses old changelog state after rebalance

Hi, we have some trouble with our Kafka Streams application. We merge multiple streams and group them by their key. The result is aggregated and written into a new topic. The aggregate step uses a state store with logging enabled to make sure that we know the last state of each key after the service crashed and can rebuild the store. The changelog topic of the store is quite big and this seemingly causes some issues after a rebalance or while we face connectivity issues to the Kafka cluster. We saw multiple times that an instance was using an older state from the changelog topic shortly after it took over a new task to combine that outdated state with a new arriving message. It looks like the changelog topic wasn’t yet consumed completely or the instance didn’t know that more messages had to be consumed. This leads to wrong aggregation results from that point on as the latest state is overwritten with the outdated one.

Example:

Changelog Topic:
Msg 1. A: A01
Msg 2. B: B01
Msg 3. C: C01  <- new instance consumed changelog until this point when new message arrived
Msg 4. A: A02
Msg 5. B: B02
Msg 6. A: A01  <- old state from Msg 1 is used again and newer state from Msg 4 is overwritten

Topology:

inputStream_A.merge(inputStream_B)
	.merge(inputStream_C)
	.merge(inputStream_D)
	.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
	.aggregate(DataBean::new, DataAggregator,
			Materialized.<String, DataBean>as(Stores.persistentKeyValueStore(AGGREGATION_STORE_NAME))
					.withKeySerde(Serdes.String())
					.withValueSerde(dataBeanSerde)
					.withLoggingEnabled(buildConfigurationStores(retentionTime, deleteRetentionTime)))
	.toStream()
	.filter(dataBeanFilter)
	.to(outputTopic, Produced.with(Serdes.String(), dataBeanSerde));

The state store topic is configured to be “compact, delete” with a retention time of 2 months and holds multiple million of messages. Some of the objects aren’t touched in weeks, before they become relevant again or can be removed safely from the store.

Are there any special settings that tell Kafka Streams to wait until the changelog is consumed completely before new messages are processed?

Could it be that you hit [KAFKA-16017] Checkpointed offset is incorrect when task is revived and restoring - ASF JIRA ? – It’s fixed in newer versions already.

This sounds like the issue we are currently facing. Currently Kafka Streams 3.4.0 is in use, but a upgrade to 3.7.1 is in progress. We will monitor it afterwards and hope for the best.

Thank you for the support.

1 Like