What is the limit on heap memory state store can use?

I am running kafka-streams application where I see the heap usage keeps increasing with the size of state store. How can I limit the heap usage of state store?

Note: In my application by nature the data will not get compacted as each message is has unique key.

Are you talking about the JVM heap usage increasing? Assuming you are using RocksDB state stores, the main thing that should influence on-heap memory usage are record caches. You can change the config StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG to reduce on-heap caches.

There are also some other caches in the consumer / producer etc. that may influence the heap memory consumption. Checkout Kafka Streams Memory Management for Confluent Platform | Confluent Documentation for more information.

Thanks for the reply @lbrutschy

But I am creating state store with caching disabled

StoreBuilder<KeyValueStore<String, Object>> alarmStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(Constants.ALARM_STORE), Serdes.String(), Serdes.serdeFrom(serializer, deserializer)).withCachingDisabled();

But still the JVM memory usage increases with the size of state store even if the caching is disabled

2024-03-26 04:29:07,611 CDT INFO [main] StreamsConfig logAll - StreamsConfig values:
application.id = correlator_streams
application.server =
bootstrap.servers = [127.0.0.1:9092]
buffered.records.per.partition = 5000
cache.max.bytes.buffering = 20971520
client.id =
commit.interval.ms = 100
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp
default.value.serde = class io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = data/correlator/
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

I am using 2.3.0 version of kafka streams library.

Please help here

Thanks
Shrenik

  • how are you measuring the heap consumption?
  • why are you using 2.3.0 which is very old?
  • have you tried taking a heap dump and seeing which data structure has all the data?