Kafka streams application goes out of memory during startup

2024-03-14 12:19:11,667 GMT ERROR [kafka-producer-network-thread | correlator_streams5-e5a602ae-d04d-4aa6-9b27-c9646b58edf6-StreamThread-1-0_0-producer] KafkaThread uncaughtException - Uncaught exception in thread ‘kafka-producer-network-thread | correlator_streams5-e5a602ae-d04d-4aa6-9b27-c9646b58edf6-StreamThread-1-0_0-producer’:
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) ~[?:1.8.0_382]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_382]
at org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28) ~[correlator-1.0.jar:?]
at org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:103) ~[correlator-1.0.jar:?]
at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:96) ~[correlator-1.0.jar:?]
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:503) ~[correlator-1.0.jar:?]
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:474) ~[correlator-1.0.jar:?]

I took a heap dump and I see a lot of ProducerBatch objects

you may want to include the kafka streams version you are talking about.

288MB doesn’t seems like a crazy amount of heap space though. You may want to increase your java heap

Hi @lbrutschy ,

Thanks for the reply…

I am using 2.3.0 version of kafka-streams library.

org.apache.kafka
kafka-streams
2.3.0


org.apache.kafka
kafka-clients
2.3.0


org.xerial.snappy
snappy-java


I am running the application with 512MB max heap.
I just wanted to understand how the producer batch size of 288MB got created? what is the default size till which the records will be batched? How can I limit the batch size in kafka streams? How can I get the topic name and partition from the heap dump?

The following is stream config that I have used:
INFO [main] StreamsConfig logAll - StreamsConfig values:
application.id = correlator_streams5
application.server =
bootstrap.servers = [127.0.0.1:9092]
buffered.records.per.partition = 5000
cache.max.bytes.buffering = 20971520
client.id =
commit.interval.ms = 100
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp
default.value.serde = class io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = data/correlator/
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

Thanks
Shrenik

Can you try this with a more recent version of Kafka Streams and using the exactly_once_v2 processing guarantee config? I wonder if it may be related to the use of many producers in exactly once v1.