Greetings everyone,
I’m currently working on a Spring Boot-based Kafka Streams application that involves joining data from two topics using an “outer-join” approach. The application then aggregates the data and attempts to match records. Any unmatched records are temporarily stored in a State-Store for roughly 10 minutes.
However, I’m encountering some unusual behavior with RocksDB as the disk space usage keeps increasing until it hits its limit. Since the application is deployed on Kubernetes, the pod eventually gets terminated. Initially, I utilized “persistentKeyValueStore” with the following configuration:
Map<String, String> config = new HashMap<>();
config.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(30 * 60 * 1000));
config.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, String.valueOf(30 * 60 * 1000));
config.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, String.valueOf(0.001));
config.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(5000));
config.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
stateStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(MISSING_STORE_NAME),
Serdes.String(),
Serdes.ByteArray())
.withLoggingEnabled(config)
.withCachingEnabled();
aggregateStore = Materialized.<String, byte[],
KeyValueStore<Bytes, byte[]>>as(AGGREGATE_STORE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.ByteArray())
.withCachingEnabled()
.withLoggingEnabled(config);
I am also using a custom “RocksDBConfigSetter”:
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.Options;
import java.util.Map;
public class CustomRocksDBConfig implements RocksDBConfigSetter {
// This object should be a member variable, so it can be closed in RocksDBConfigSetter#close.
private final org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
private static final long DELETE_OBSOLETE_FILES_INTERVAL_MICROS = 60000000000L * 10; // 10 minutes
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setBlockCache(cache);
tableConfig.setBlockSize(16 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setCompactionStyle(CompactionStyle.LEVEL);
options.setLevelCompactionDynamicLevelBytes(true);
options.setMaxWriteBufferNumber(2);
options.setMinWriteBufferNumberToMerge(1);
options.setDeleteObsoleteFilesPeriodMicros(DELETE_OBSOLETE_FILES_INTERVAL_MICROS);
options.setMaxTotalWalSize(512 * 1024L * 1024L);
}
@Override
public void close(final String storeName, final Options options) {
cache.close();
}
}
After the application is deployed, the disk becomes full and reaches 100 GB in approximately 4 hours. I have attempted to resolve the issue by changing the store to “inMemoryKeyValueStore”, but this did not provide a solution, which is quite odd.
Any suggestions you may have would be greatly appreciated. Thank you!