Issue with Continuously Increasing Disk Usage

Greetings everyone,

I’m currently working on a Spring Boot-based Kafka Streams application that involves joining data from two topics using an “outer-join” approach. The application then aggregates the data and attempts to match records. Any unmatched records are temporarily stored in a State-Store for roughly 10 minutes.

However, I’m encountering some unusual behavior with RocksDB as the disk space usage keeps increasing until it hits its limit. Since the application is deployed on Kubernetes, the pod eventually gets terminated. Initially, I utilized “persistentKeyValueStore” with the following configuration:

        Map<String, String> config = new HashMap<>();
        config.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(30 * 60 * 1000));
        config.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, String.valueOf(30 * 60 * 1000));
        config.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, String.valueOf(0.001));
        config.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(5000));
        config.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);


            stateStore = Stores.keyValueStoreBuilder(
                            Stores.persistentKeyValueStore(MISSING_STORE_NAME),
                            Serdes.String(),
                            Serdes.ByteArray())
                    .withLoggingEnabled(config)
                    .withCachingEnabled();

            aggregateStore = Materialized.<String, byte[],
                            KeyValueStore<Bytes, byte[]>>as(AGGREGATE_STORE_NAME)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(Serdes.ByteArray())
                    .withCachingEnabled()
                    .withLoggingEnabled(config);

I am also using a custom “RocksDBConfigSetter”:

import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.Options;

import java.util.Map;

public class CustomRocksDBConfig implements RocksDBConfigSetter {
    // This object should be a member variable, so it can be closed in RocksDBConfigSetter#close.
    private final org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
    private static final long DELETE_OBSOLETE_FILES_INTERVAL_MICROS = 60000000000L * 10; // 10 minutes

    @Override
    public void setConfig(String storeName, Options options, Map<String, Object> configs) {
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
        tableConfig.setBlockCache(cache);
        tableConfig.setBlockSize(16 * 1024L);
        tableConfig.setCacheIndexAndFilterBlocks(true);
        options.setTableFormatConfig(tableConfig);
        options.setCompactionStyle(CompactionStyle.LEVEL);
        options.setLevelCompactionDynamicLevelBytes(true);
        options.setMaxWriteBufferNumber(2);
        options.setMinWriteBufferNumberToMerge(1);
        options.setDeleteObsoleteFilesPeriodMicros(DELETE_OBSOLETE_FILES_INTERVAL_MICROS);
        options.setMaxTotalWalSize(512 * 1024L * 1024L);
    }

    @Override
    public void close(final String storeName, final Options options) {
        cache.close();
    }

}


After the application is deployed, the disk becomes full and reaches 100 GB in approximately 4 hours. I have attempted to resolve the issue by changing the store to “inMemoryKeyValueStore”, but this did not provide a solution, which is quite odd.

Any suggestions you may have would be greatly appreciated. Thank you!

Hi,

I am sorry to read that you have memory issues with RocksDB!

Please have a look at Kafka Streams Memory Management | Confluent Documentation

Especially, note that the cache is defined as a static variable. That is needed so that all RocksDB state stores of the same Kafka Streams client share the same cache.

Furthermore, you can also limit the size of the memtable by using a static WriteBufferManager. You can even count the memory of the WriteBufferManager against the specified cache. In such away, you control memory limit with a single bound.

If you use static cache and write buffer manager, please do not close them in the close method of the config setter since those data structures are shared by different RocksDB state stores.

Best,
Bruno

Hi Bruno,

Thanks for getting back to me so quickly. Just wanted to clarify that I’m actually having a disk space issue, not a memory issue. The memory usage appears to be fine, but after about 4 hours, the disk fills up completely at 100 GB. Thanks again!

Hi,

sorry, my bad!

Do you implement your own join or do you use the one from the Streams’ DSL?

You can use the following metrics to monitor your application:
https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics

Especially, look at

compaction-pending
num-running-compactions
total-sst-files-size
estimate-num-keys

to better understand what is going on.

Compactions (compaction-pending, num-running-compactions) should reduce size on disk (total-sst-files-size) if there are updates to keys (estimate-num-keys).

I see that you have already switched to level compaction to save disk space. Here is a tuning guide for level compaction:

Hi Bruno,

I have tried to switch to FIFO Compaction, and the disk usage is much better now. I’m not sure this approach is the right and recommended, but it seems to have done the trick. Thank you for the information regarding the RocksDB metrics; it does help me understand the behavior.