Kafka Streams applications cause high memory utilization on the Kafka Broker

I have a Kafka Streams application that uses a RocksDB state store. I have 3 instances of this application deployed in Kubernetes and a dedicated Kafka Broker with 3 replicas. From what I can see, the memory usage on the Kafka Broker instances is egregious (it reaches 25 GB within an hour). I’m having a tough time understanding what causes it. I want to emphasize that I’m experiencing memory issues on the Kafka Broker instances and NOT on the application’s nodes. That results in the termination of Kafka Broker instances followed by NotEnoughReplicasException errors.
This is my custom RocksDB configuration class:

public class CustomRocksDBConfig implements RocksDBConfigSetter {
    // This object should be a member variable, so it can be closed in RocksDBConfigSetter#close.
    private final org.rocksdb.Cache cache = new org. rocksdb.LRUCache(16 * 1024L * 1024L);
    private static final long DELETE_OBSOLETE_FILES_INTERVAL_MICROS = 60000000000L * 10; // 10 minutes

    @Override
    public void setConfig(String storeName, Options options, Map<String, Object> configs) {

        // table config
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
        tableConfig.setBlockCache(cache);
        tableConfig.setBlockSize(16 * 1024L);
        tableConfig.setCacheIndexAndFilterBlocks(true);
        options.setTableFormatConfig(tableConfig);

        options.setDeleteObsoleteFilesPeriodMicros(DELETE_OBSOLETE_FILES_INTERVAL_MICROS);
        options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);

        // FIFO Compaction Configuration
        options.setCompactionStyle(CompactionStyle.FIFO);
        CompactionOptionsFIFO compactionOptionsFIFO = new CompactionOptionsFIFO();
        compactionOptionsFIFO.setMaxTableFilesSize(1024 * 1024 * 1024L); // 1GB
        options.setCompactionOptionsFIFO(compactionOptionsFIFO);

        options.setMaxOpenFiles(-1);
        options.setPeriodicCompactionSeconds(60 * 10L); // 10 minutes
        options.setTtl(60 * 10L); // 10 minutes
    }

    @Override
    public void close(final String storeName, final Options options) {
        cache.close();
    }

}

This is my Kafka Streams configuration (I’m not sure what causes the issue, so maybe some of the following parameters are irrelevant):

// Stream related
num.stream.threads: 40
replication.factor: 2
topology.optimization: all
windowstore.changelog.additional.retention.ms: 600000
max.task.idle.ms: -1L
state.cleanup.delay.ms: 600000
request.timeout.ms: 300000
cache.max.bytes: 100 * 1024 * 1024L

// Consumer related
max.poll.interval.ms: 600000
max.poll.records: 1000
fetch.max.wait.ms: 10000
fetch.min.bytes: 51200

// Publisher related
max.block.ms: 180000
delivery.timeout.ms: 360000

All relevant topics are set with min.insync.replicas of 2 and 2 replicas.

I would appreciate any assistance I can get on that matter.
Thank you in advance!

That seems to be mostly a broker question. I might be good to get a broker heap dump to understand which component is using the memory.

That results in the termination of Kafka Broker instances followed by NotEnoughReplicasException errors.

Given your configuration with replication.factor = 2 and min.insync.replicas = 2 this is expected. You basically configure the broker to reject writes if one partition replica is offline (note that replication factor 2 means, you get the leader and a single follower – the leader is also counted as a replica). Thus, if the follower is offline, there is only the leader left as only in-sync replica, and you cannot produce any longer.

1 Like

Thank you, @mjsax, for your prompt response! I’m still trying to obtain a broker heap dump. So, to address the behavior for now, would it be best to modify the replication.factor and/or min.insync.replicas? As mentioned, I have 3 Kafka broker instances, so maybe I should set replication.factor = 3? Is it considered a good thing to do?

What configuration you need, depends on what your goals are. A “standard / recommended” production configuration would use a replication.factor = 3 (in combination with min.insync.replicas = 2). So yes, it’s considered a good thing to do.

This configuration allows you to tolerate a single broker outage without any downtime, ie, producers can still write, and consumers can still read, and your data (if you are using ack=all) will be replicated to at least 2 brokers, so you are guarded against data loss, too.

1 Like