Avoiding RecordTooLargeException on Kafka Streams

Hello, I am using Kafka Streams to create batches of all events received on a time window.

Below is the code snippet showing how I am implementing this functionality:

builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
          .groupByKey()
          .windowedBy(TimeWindows.ofSize(120))
          .aggregate(
                () -> new TestDataList(),
                this::aggregateEvent,
                Materialized.with(Serdes.String(), Serdes.String())
          )
        .suppress(untilWindowCloses(unbounded()))
        .toStream()
        .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()))

private TestDataList aggregateEvent(String key, String data, TestDataList list) {
   list.add(new TestData(data));
   return list;
}

Error encountered sending record to topic:

test-batch-config-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog for task 0_0 due to:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1054490 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

I’ve been reading about how to solve this and tried increasing the max.request.size which works, but it seems this will need to be done every time the batch size increases, which is not ideal.

I wonder if there is a way of dinamycally limiting the size of the message sent to changelog topic, on this case the batch being created in the aggregate function, so that it does not exceed the maximum request size. Something like splitting the TestaDataList object everytimes it reaches 100 messages and produce not one, but many of them ?

Things I’ve tried:

  1. suppress operator,

.suppress(untilTimeLimit(ofSeconds(120), maxBytes(100_000L).emitEarlyWhenFull()))

but it seems this just control the memory usage and does not limit the size of the message sent to the changelog topic.

  1. Also I tried inheriting ProductionExceptionHandler to handle RecordTooLargeException and resend the splitted message from there but I am not sure which key to use.

Any advice or recommendation on how to handle this situation would be appreciated.

The simples way might be a `flatMapValues() before you write into the output topic?

Syntax might not be correct, but something like this:

...
.toStream()
.flatMapValues(v -> {
    if (v.size() > THRESHOLD) { // split into two records
        return Arrays.asList(v.getFirstHalfOfList(), v.getSecondHalfOfList());
    }
    return v; // small enough to return unmodified
}}
.to(...);

Thanks for quick response. Yeah I tried and still got the same error.

I think problem is that the error happens while sending the record to topic test-batch-config-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog according to Exception and this is before in the flow. If am not wrong this happens right after aggregate step, so is the list resulting there the one that should be splitted. But aggregate only expect one final accumulator so I guess is not possible to return 2 list from there and expect this to produce 2 messages…