Intermittent data loss under high volume with aggregation after map() or groupBy() (rekey, repartition) with log compaction enabled

Intermittent data loss under high volume with aggregation after map() or groupBy() (rekey, repartition) with log compaction enabled

A fairly standard use case: aggregate a list of child records by their parentId, and then join that list/map of child records to the parent record in Kafka streams.

CHILD_TOPIC {key: childId, value: {childId, parentId, otherChildData} }

PARENT_TOPIC: {key: parentId, value{parentId, otheParentData} }

PARENTS_WITH_CHILDREN_TOPIC: {key: parentId, value{parentId, childListOrMap: [{childId, parentId, otherChildData},...], otherParentData}}

Default cleanup.policy=compact

Attempted multiple implementation approaches (descriptive summary here, full code details below).

KStreams Approach: generate childrenByParentKTable via streams, then leftJoin to parent KTable
	stream(CHILD_TOPIC)  
	.map() // rekey from childId to parentId
	.groupByKey() // group child records by parentId
	.aggregate(); //create new list/map of children for same parentId
	KTable(parentData).leftJoin(childrenByParentKTable);

KTables Approach: generate childrenByParentKTable using KTables instead of streams, and write to and read from a topic before leftJoin to parent data.
	.table(CHILD_TOPIC)
	.groupBy(child.getParentId);
	.aggregate()
	.toStream.to(CHILD_LIST_BY_PARENT_TOPIC_NAME);
	KTable childrenByParentKTable = builder.table(CHILD_LIST_BY_PARENT_TOPIC_NAME);
	KTable(PARENT_TOPIC).leftJoin(childrenByParentKTable);

in both cases, write the leftJoin results to a downstream PARENTS_WITH_CHILDREN_TOPIC

Both approaches result in loss of data (missing childRecords in downstream PARENTS_WITH_CHILDREN_TOPIC) under high volume if log compaction is enabled. Specifically, the required step of rekeying the child records (from key: childId to key: parentId} to group by the parentId auto-generates an internal streams “-repartition” topic which contains the childRecords with parentId as the key. If log compaction is enabled, under high volume, childRecords with the same parentId are dropped before being successfully grouped and aggregated – i.e., the “-repartition” topic is being log-compacted and dropping older childId records of the same parentId before the aggregation step. The end result is the downtream PARENTS_WITH_CHILDREN_TOPIC is intermittently missing child records.

This behavior of missing/dropped child records can be consistently recreated by either:
(1) Creating a large volume of input child records at one time (millions)
(2) Forcing aggressive log compaction (e.g., min.cleanable.dirty.ratio=0.01, segment.ms=100, delete.retention.ms=100)

What is the recommended approach for this standard use case of aggregating childern by a parentId attribute that avoid dropped/missing records under high volume?
Is there a problem with the streams code/methods/settings being used below?
Is it possible to use cleanup.policy=compact for this use case, or is keeping all data forever in the streams internal topic(s) the only solution to avoid dropped/missing child data?

Illustrative code, showing both a KStream and KTable approach. (This is was rewritten to make more generic with parent/child, apoloiges for any typos/mistakes).

//Common streams setup/configs.

private Properties getStreamsConfig() {
	Properties config = new Properties();
	config.put(StreamsConfig.APPLICATION_ID_CONFIG, "ParentChildJoinStreams");
	config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
	config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
	config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericPrimitiveAvroSerDe.class);
	config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
	config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
	config.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, SerializationExceptionHandler.class.getName());
	config.put("schema.registry.url", SCHEMA_REGISTRY_URL);
	config.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
	config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
	config.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
	config.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
	return config;
}

public Topology buildTopology() {
	final Serde<String> keySerde = new GenericPrimitiveAvroSerDe<>();
	keySerde.configure(
			Collections.singletonMap("schema.registry.url", SCHEMA_REGISTRY_URL),
			true //isKey
			);
	final Serde<InputChild> childSerde = new SpecificAvroSerde<>();
	childSerde.configure(
			Collections.singletonMap("schema.registry.url", SCHEMA_REGISTRY_URL),
			false //isKey
			);
	final Serde<ChildListByParentId> childListByParentSerde = new SpecificAvroSerde<>();
	childListByParentSerde.configure(
			Collections.singletonMap("schema.registry.url", SCHEMA_REGISTRY_URL),
			false //isKey
			);

    Initializer<ChildListByParentId> childListByParentInitializer = new Initializer<ChildListByParentId>(){
    	@Override
    	public ChildListByParentId apply(){
    		ChildListByParentId childListByParentId = new ChildListByParentId();
    		Map<String, example.common.avro.parentAndChild.Child> childList = new HashMap<String, example.common.avro.parentAndChild.Child>();
    		childListByParentId.setChildren(childList);
    		return childListByParentId;
    	}
    };
	Aggregator<String, InputChild, ChildListByParentId> childListByParentIdAggregator = new Aggregator<String, InputChild, ChildListByParentId>() {
        @Override
        public ChildListByParentId apply(final String key, final InputChild value, final ChildListByParentId aggregate) {
        	aggregate.setParentId(value.getParentId());
        	
        	//Convert InputChild to Child (e.g., not all InputChild attributes belong in the aggregated parent list of child records)
        	example.common.avro.parentAndChild.Child newValue = new Child();
        	newValue.setChildId(value.getChildId());
        	newValue.setParentId(value.getParentId());
        	newValue.setOtherValue(value.getOtherValue());
        	
        	Map<String, example.common.avro.parentAndChild.Child> childList = aggregate.getChildren();
        	childList.put(newValue.getChildId, newValue);
			return aggregate;
        }
    };

/************** KStreams Approach sample code: **************/

	//Group the child records by ParentId
	final KTable<String, childListByParentId> childListByParentIdKTable = builder.stream(CHILD_TOPIC, Consumed.with(keySerde, childSerde))
		// Set key to parentId and value to same original child value
		.map((k, v) -> new KeyValue<>((String) v.getParentId(), v))
		// Group by new key (parentId)
		.groupByKey(Grouped.with(keySerde, childSerde))
		// Aggregate into a list of children for the parent
		.aggregate(childListByParentInitializer, childListByParentIdAggregator);


/************** KTable Approach sample code: **************/
	//re-using same Initializer and Aggregator from above...

	//The substractor...
	Aggregator<String, InputChild, ChildListByParentId> childListByParentIdSubtractor = new Aggregator<String, InputChild, ChildListByParentId>() {
		@Override
		public ChildListByParentId apply(final String key, final InputChild oldValue, final ChildListByParentId aggregate)  {
			Map<String, example.common.avro.parentAndChild.Child> childList = aggregate.getChildren();
			if(childList != null){
				childList.remove(oldValue.getChildId());
			}
			aggregate.setChildren(childList);
			return aggregate;
		}
	};
	
	//Create a KTable -- latest version of each child by childId (key)
	final KTable<String, InputChild> childByChildIdKTable = builder.table(CHILD_TOPIC, Consumed.with(keySerde, childSerde));
	//Create a KGroupedTable using KTable.groupBy method to group by a new key -- the parentId
	KGroupedTable<String, InputChild> childrenByParentIdKGroupedTabled = childByChildIdKTable.groupBy((k, v) -> new KeyValue<>((String) v.getParentId(), v));

	// Now, aggregate the latest version of all children by parentId!
	// Aggregating a KGroupedTable into a new KTable
	KTable<String, ChildListByParentId> childrenByParentAggregatedKTable = childrenByParentIdKGroupedTabled.aggregate(
			childListByParentInitializer, // initializer - create a new ChildListByParentId for this parentId
			childListByParentIdAggregator, // adder - add/update children in the map/list of children for this parentId
			childListByParentIdSubtractor, // subtractor  - remove child from the map/list of children for this parentId
			Materialized.as("childernByParentAggregatedKTable-table-store") // state store name
			.with(keySerde, childListByParentSerde) // serde for aggregate value
			);
	
	// Publish the children by parentId to a topic for persistence
	childrenByParentAggregatedKTable.toStream().to(CHILD_LIST_BY_PARENT_TOPIC_NAME, Produced.with(keySerde, childListByParentSerde));
	
	// Read the children by parentId back from the topic as a KTable, for joining with the full parent data by parentId
	KTable<String, ChildListByParentId> childrenByParentIdKTable = builder.table(CHILD_LIST_BY_PARENT_TOPIC_NAME, Consumed.with(keySerde, childListByParentSerde));


/************** Common code to join aggregated child list by parentId KTable to the full parent records (of various types) **************/

	//Join the children by parentId with the full parent data topics for each parent type, and publish to new parent_with_children topics
	buildParentAtopology(childrenByParentIdKTable, keySerde);
	buildParentBtopology(childrenByParentIdKTable, keySerde);
	buildParentCtopology(childrenByParentIdKTable, keySerde);
	//... etc ...
	
	return builder.build();
}

private void buildParentAtopology(KTable<String, ChildListByParentId> childrenByParentIdKTable, Serde<String> keySerde) {
	final Serde<ParentA> parentASerde = new SpecificAvroSerde<>();
	parentASerde.configure(
			Collections.singletonMap("schema.registry.url", SCHEMA_REGISTRY_URL),
			false //isKey
			);
	
	//Create KTable for ParentA records
	final KTable<String, ParentA> parentAKTable = builder.table(PARENT_A_TOPIC_NAME, Consumed.with(keySerde, parentASerde));

	ValueJoiner<ParentA, ChildListByParentId, ParentA> parentAChildJoiner = new ValueJoiner<ParentA, ChildListByParentId, ParentA>() {
		@Override
		public ParentA apply(ParentA parentA, ChildListByParentId childernByParentId) {
			List<example.common.avro.parent.Child> parentChild = new ArrayList<example.common.avro.parent.Child>();
			if(childernByParentId != null && childernByParentId.getChildern() != null && !childernByParentId.getChildern().isEmpty()) {
            	childernByParentId.getChildern().forEach((k,v) -> {
					// Create a version of the child appropriate for this parent (e.g., not all child attributes belong in the aggregated parent list of child records)
					example.common.avro.parent.Child thisChild = new example.common.avro.parent.Child();
					thisChild.setChildId(v.getChildId());
					thisChild.setParentId(v.getParentId());
					thisChild.setOtherValue(v.getOtherValue());

					childernByParentId.add(thisChild);
				});
			} else {
				logger.info("parentAChildJoiner : no children for parentId={}",parentA.getParentId());
			}
			parentA.setChildren(childernByParentId);
			return parentA;
		}
	};
	
	//Join Parent A Data and Child Data and write to topic
	parentAKTable.leftJoin(childrenByParentIdKTable, parentAChildJoiner)
		.toStream().to(PARENT_A_WITH_CHILDERN_TOPIC_NAME, Produced.with(keySerde, parentASerde));
}

private void buildParentBtopology( ...etc...

I have a similar problem, see Missing records in Kafka Streams foreign-key join - Stack Overflow.

You should never configure a repartition topic with compaction. By default, repartition topics are configures with infinite retention time, and Kafka Streams issues explicit “delete record requests” after it successfully processed the data from the repartition topic.

1 Like

Interesting! Many thanks for your reply @mjsax.

Can you elaborate, why cleanup policy should be delete (with infinite retention) on repartition topics? And could compaction on repartition topics lead to message loss in joins/aggregates?

Should also -subscription-registration and -subscription-response topics produced by a foreign-key join never be compacted?

From what I understand, having topic.cleanup.policy=compact as default compaction setting is dangerous in KafkaStreams Apps with repartitions / joins, since it causes all autogenerated (inner) topics, including the -repartition topics, to be compacted. Is this correct?

Yes, it could lead to message loss.

Correct. In general, Kafka Streams will setup internal topics with the desired configs and there should be never a reason to change those configs.

Kafka cluster level default should not matter, because Kafka Streams would configure the compaction policy on a per-topic basis expliclity. (Otherwise, there would be a but that we should fix.)

Thanks a lot, I think we now have a clue why we loose messages. I could not yet test it.

About the cleanup.policy config: My mistake was to set topic.cleanup.policy=compact on the props in new KafkaStreams(topology, props), not on the broker level, therefore the topic. prefix. And this did indeed change the behavior of the KafkaStreams app regarding the internal topics.

A warning on startup would help to resolve this mistake.

1 Like

Would you mind filing a ticket about it? https://issues.apache.org/jira/browse/KAFKA