Hello, I am using Kafka Streams to create batches of all events received on a time window.
Below is the code snippet showing how I am implementing this functionality:
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.ofSize(120))
.aggregate(
() -> new TestDataList(),
this::aggregateEvent,
Materialized.with(Serdes.String(), Serdes.String())
)
.suppress(untilWindowCloses(unbounded()))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()))
private TestDataList aggregateEvent(String key, String data, TestDataList list) {
list.add(new TestData(data));
return list;
}
Error encountered sending record to topic:
test-batch-config-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog for task 0_0 due to:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1054490 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
I’ve been reading about how to solve this and tried increasing the max.request.size
which works, but it seems this will need to be done every time the batch size increases, which is not ideal.
I wonder if there is a way of dinamycally limiting the size of the message sent to changelog topic, on this case the batch being created in the aggregate
function, so that it does not exceed the maximum request size. Something like splitting the TestaDataList
object everytimes it reaches 100 messages and produce not one, but many of them ?
Things I’ve tried:
suppress
operator,
.suppress(untilTimeLimit(ofSeconds(120), maxBytes(100_000L).emitEarlyWhenFull()))
but it seems this just control the memory usage and does not limit the size of the message sent to the changelog topic.
- Also I tried inheriting
ProductionExceptionHandler
to handleRecordTooLargeException
and resend the splitted message from there but I am not sure which key to use.
Any advice or recommendation on how to handle this situation would be appreciated.