We have a kafka streams app that uses the cogroup feature to “join” 6 different streams. The documentation states that cogroup outputs a new aggregated messages any time one of the feeds has a new message arrive.
Out of all 6 streams, one of those streams is fairly higher volume than the rest (20 m/s) and the rest are .5 m/s. What we are seeing is a subset of the higher volume messages in the output (cogrouped) topic.
Given the same time period, we saw that there were 139 message up stream on the high volume topic but the cogrouped downstream feed only had 39 message.
We repartition (co partition) all the topics before the cogroup and they have the same keys.
KTable<String, movementReport> movementReportKTable = movementAuthorizedStream.cogroup(new movementAuthorizedAggregator())
.cogroup(movementLocatedStream, new movementLocatedAggregator())
.cogroup(dtleStream, new DtleAggregator())
.cogroup(movementConsistStream, new movementConsistAggregator())
.cogroup(movementStoppedStartedStream, new movementStoppedStartedAggregator())
.cogroup(movementTerminationStream, new movementTerminationAggregator())
.aggregate(movementReport::new,
Materialized.<String, movementReport, KeyValueStore<Bytes, byte[]>>as("movement-report-ktable-store")
.withKeySerde(Serdes.String())
.withValueSerde(new ProtoSerde<>(movementReport::new))
.withRetention(Duration.ofDays(28)));
We have 5 others like this below:
private KGroupedStream<String, movementAuthorized> buildmovementAuthStream(StreamsBuilder builder) {
return builder.stream(props.getmovementAuthorizedInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(movementAuthorized::new), new FailOnInvalidTimestamp(), AutoOffsetReset.EARLIEST))
.selectKey((k, v) -> movementUidUtil.validateNGenerateKey(v.getmovementUid()))
.repartition(Repartitioned.with(Serdes.String(), new ProtoSerde<>(movementAuthorized::new)).withNumberOfPartitions(coPartitionCount))
.groupByKey();
}