Avoiding RecordTooLargeException on Kafka Streams

Hello, I am using Kafka Streams to create batches of all events received on a time window.

Below is the code snippet showing how I am implementing this functionality:

builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
          .groupByKey()
          .windowedBy(TimeWindows.ofSize(120))
          .aggregate(
                () -> new TestDataList(),
                this::aggregateEvent,
                Materialized.with(Serdes.String(), Serdes.String())
          )
        .suppress(untilWindowCloses(unbounded()))
        .toStream()
        .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()))

private TestDataList aggregateEvent(String key, String data, TestDataList list) {
   list.add(new TestData(data));
   return list;
}

Error encountered sending record to topic:

test-batch-config-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog for task 0_0 due to:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1054490 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

I’ve been reading about how to solve this and tried increasing the max.request.size which works, but it seems this will need to be done every time the batch size increases, which is not ideal.

I wonder if there is a way of dinamycally limiting the size of the message sent to changelog topic, on this case the batch being created in the aggregate function, so that it does not exceed the maximum request size. Something like splitting the TestaDataList object everytimes it reaches 100 messages and produce not one, but many of them ?

Things I’ve tried:

  1. suppress operator,

.suppress(untilTimeLimit(ofSeconds(120), maxBytes(100_000L).emitEarlyWhenFull()))

but it seems this just control the memory usage and does not limit the size of the message sent to the changelog topic.

  1. Also I tried inheriting ProductionExceptionHandler to handle RecordTooLargeException and resend the splitted message from there but I am not sure which key to use.

Any advice or recommendation on how to handle this situation would be appreciated.

The simples way might be a `flatMapValues() before you write into the output topic?

Syntax might not be correct, but something like this:

...
.toStream()
.flatMapValues(v -> {
    if (v.size() > THRESHOLD) { // split into two records
        return Arrays.asList(v.getFirstHalfOfList(), v.getSecondHalfOfList());
    }
    return v; // small enough to return unmodified
}}
.to(...);

Thanks for quick response. Yeah I tried and still got the same error.

I think problem is that the error happens while sending the record to topic test-batch-config-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog according to Exception and this is before in the flow. If am not wrong this happens right after aggregate step, so is the list resulting there the one that should be splitted. But aggregate only expect one final accumulator so I guess is not possible to return 2 list from there and expect this to produce 2 messages…

Oh, yes. This makes sense.

Sounds difficult to work around. The most straightforward, but also complex, way would be to fall back to a custom Processor. This allows you to do whatever you want. Of course, it also means that you re-implement windowing by yourself…

Another workaround could be, to change the aggregator, and let it clear the list, if the number of entries becomes too large. For this case, you could have an additional field in TestDataList, that counts up by one, each time a list gets reset to zero entries. This way, you can distinguish different partial results (of course, they still have the same key, so you should not use suppress(), and disable catching via Materialize on the aggregation step itself, to make sure you get every individual update – otherwise you might miss data) and also stitch together the result further downstream.

Thanks a million for ideas, is very valuable for us developers, to have such a good support from you guys in Confluent. I’ll need to deep dive into them and take a decision.

1 Like