Kafka Streams cogroup seems to drop messages from one of the feeds

We have a kafka streams app that uses the cogroup feature to “join” 6 different streams. The documentation states that cogroup outputs a new aggregated messages any time one of the feeds has a new message arrive.

Out of all 6 streams, one of those streams is fairly higher volume than the rest (20 m/s) and the rest are .5 m/s. What we are seeing is a subset of the higher volume messages in the output (cogrouped) topic.

Given the same time period, we saw that there were 139 message up stream on the high volume topic but the cogrouped downstream feed only had 39 message.

We repartition (co partition) all the topics before the cogroup and they have the same keys.

    KTable<String, movementReport> movementReportKTable = movementAuthorizedStream.cogroup(new movementAuthorizedAggregator())
        .cogroup(movementLocatedStream, new movementLocatedAggregator())
        .cogroup(dtleStream, new DtleAggregator())
        .cogroup(movementConsistStream, new movementConsistAggregator())
        .cogroup(movementStoppedStartedStream, new movementStoppedStartedAggregator())
        .cogroup(movementTerminationStream, new movementTerminationAggregator())
        .aggregate(movementReport::new,
            Materialized.<String, movementReport, KeyValueStore<Bytes, byte[]>>as("movement-report-ktable-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(new ProtoSerde<>(movementReport::new))
            .withRetention(Duration.ofDays(28)));

We have 5 others like this below:

  private KGroupedStream<String, movementAuthorized> buildmovementAuthStream(StreamsBuilder builder) {
    return builder.stream(props.getmovementAuthorizedInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(movementAuthorized::new), new FailOnInvalidTimestamp(), AutoOffsetReset.EARLIEST))
        .selectKey((k, v) -> movementUidUtil.validateNGenerateKey(v.getmovementUid()))
        .repartition(Repartitioned.with(Serdes.String(), new ProtoSerde<>(movementAuthorized::new)).withNumberOfPartitions(coPartitionCount))
        .groupByKey();
  }

I’m curious if this is a bug within the cogroup functionality or if there is something specific I need to do it have it emit a new output message upon arrival of any of the messages being co-grouped even if a single input topic has multiple messages in a row before the other topics have waiting messages ready to consume.

There may be 5 messages arrive for one single topic before any of the other 4 have a message. I should still see 5 cogrouped output messages though, right?

  t0       t1       t2       t3        t4       
1 a1      a2        a3       a4        a5       
2 null    b1       idle 
3 null    c1       idle
4 null    d1       idle
5 null    e1       idle

t0 output = a1,null,null,null,null
t1 output = a2,b1,c1,d1,e1
t2 output = a3,b1,c1,d1,e1
t3 output = a4,b1,c1,d1,e1
t4 output = a5,b1,c1,d1,e1

If this is what is expected, there may be a bug somewhere or maybe a setting I’m missing.

CoGroup by default, as other operators, use caching and thus you might not see every output. You can disable caching globally by setting cache size to zero in the config (cache.max.bytes.buffering), or on a per-operator level via Materialized#withCachingDisabled().

Cf Kafka Streams Memory Management | Confluent Documentation

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.