I have a Kafka Streams application that uses a RocksDB state store. I have 3 instances of this application deployed in Kubernetes and a dedicated Kafka Broker with 3 replicas. From what I can see, the memory usage on the Kafka Broker instances is egregious (it reaches 25 GB within an hour). I’m having a tough time understanding what causes it. I want to emphasize that I’m experiencing memory issues on the Kafka Broker instances and NOT on the application’s nodes. That results in the termination of Kafka Broker instances followed by NotEnoughReplicasException errors.
This is my custom RocksDB configuration class:
public class CustomRocksDBConfig implements RocksDBConfigSetter {
// This object should be a member variable, so it can be closed in RocksDBConfigSetter#close.
private final org.rocksdb.Cache cache = new org. rocksdb.LRUCache(16 * 1024L * 1024L);
private static final long DELETE_OBSOLETE_FILES_INTERVAL_MICROS = 60000000000L * 10; // 10 minutes
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
// table config
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setBlockCache(cache);
tableConfig.setBlockSize(16 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setDeleteObsoleteFilesPeriodMicros(DELETE_OBSOLETE_FILES_INTERVAL_MICROS);
options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
// FIFO Compaction Configuration
options.setCompactionStyle(CompactionStyle.FIFO);
CompactionOptionsFIFO compactionOptionsFIFO = new CompactionOptionsFIFO();
compactionOptionsFIFO.setMaxTableFilesSize(1024 * 1024 * 1024L); // 1GB
options.setCompactionOptionsFIFO(compactionOptionsFIFO);
options.setMaxOpenFiles(-1);
options.setPeriodicCompactionSeconds(60 * 10L); // 10 minutes
options.setTtl(60 * 10L); // 10 minutes
}
@Override
public void close(final String storeName, final Options options) {
cache.close();
}
}
This is my Kafka Streams configuration (I’m not sure what causes the issue, so maybe some of the following parameters are irrelevant):
// Stream related
num.stream.threads: 40
replication.factor: 2
topology.optimization: all
windowstore.changelog.additional.retention.ms: 600000
max.task.idle.ms: -1L
state.cleanup.delay.ms: 600000
request.timeout.ms: 300000
cache.max.bytes: 100 * 1024 * 1024L
// Consumer related
max.poll.interval.ms: 600000
max.poll.records: 1000
fetch.max.wait.ms: 10000
fetch.min.bytes: 51200
// Publisher related
max.block.ms: 180000
delivery.timeout.ms: 360000
All relevant topics are set with min.insync.replicas of 2 and 2 replicas.
I would appreciate any assistance I can get on that matter.
Thank you in advance!