TL;DR: In Kafka Streams (3.9.1 / 4.1.0), my main stream consumer fetch rate stays low (~140k msgs/s) even after a RocksDB-backed GlobalKTable restore completes. The total fetch rate only recovers (~200k msgs/s) if I rewrite all data to the GlobalKTable topic. Why does this happen, and can RocksDB config/workarounds fix it?
I’m running a Kafka Streams application (tested with versions 3.9.1 and 4.1.0) that uses a GlobalKTable backed by RocksDB. There are multiple instances of the application, each with 4 stream threads. The application applies only very basic transformations to incoming messages (no joins or other stateful operations). The GlobalKTable is a simple mapping from ID → value that is used to enrich the stream. The sst files are lost after restart, so global stream thread reads the full topic.
During startup, the GlobalKTable restore runs as expected. After the restore is complete, the main stream consumer fetch rate stays relatively low (~140k msgs/s across all consumers). The cpu utilization stays at ~70%.
Once I re-produce all data to the underlying topic for the GlobalKTable and global stream threads is done processing those new records, the main consumers start fetching at a much higher rate (~200k msgs/s) and CPU utilization goes up to 90%s until the lag is processed.
The GlobalKTable holds about 500 MB of data, and the underlying topic is configured as compact. I set the RocksDB block cache size to roughly the dataset size so most data should be cached in memory.
Here are some of the rocksdb configs that I already have/tried. Flipping these does not seem to make a difference. :
tableConfig.setCacheIndexAndFilterBlocks(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
tableConfig.setFormatVersion(5);
options.setTableFormatConfig(tableConfig);
options.optimizeForSmallDb(cache);
options.optimizeForPointLookup(cacheSize);
Here are a few things that I tried:
-
Using both a Stream-GlobalKTable join as well as implementing FixedKeyProcessor and calling get on the global state store for the needed keys. The behavior was the same with both approaches.
-
Prepopulating the RocksDB block cache by iterating over the global store in setGlobalStateRestoreListener. I verified via RocksDB metrics that the block cache size increased to near capacity, but the low consumer fetch rate persisted.
-
Switched to an in-memory state store to verify if the issue is due to RocksDB. In this case the issue did not occur - consumer fetch rate increased to what I expected.
-
Tried less consumers with much less amount of data, and the issue remained.
Q: What could cause the main stream consumers to remain slow after a RocksDB-backed GlobalKTable restore, and is there a RocksDB configuration or workaround to avoid this?