KGroupedStream / Reduce: Dramatic Performance Degradation Over Time

Hello,

I have stream of <Key, Value> and Value is bitemporal: it has two timestamps, second timestamp is “record correction” timestamp.

I have “raw data” stream, and I need to build GlobalKTable having <Key, Latest-Value> from it.

Here is very trivial Kafka Streams doing something like Map/Reduce:

streamBuilder
  .stream("myAllDataTopic")
  .groupByKey()
  .reduce(
    (v1, v2) -> {
      if (v1.getTimestamp1() > v2.getTimestamp1()) return v1;
      else if (v1.getTimestamp1() < v2.getTimestamp1() return v2;
      else return (v1.getTimestamp2() > v2.getTimestamp2() ? v1 : v2);
    })
  .toStream()
  .to("myLatestDataTopic")

I believe something is missing here. Initially, I run 5000 TPS with single dedicated Confluent CKU, with average latency 200ms, message size 2Kb, but after about 2-3 hours performance starts to degrade dramatically to more than an hour latency.

Maybe something wrong with “reduce” function or approach in general? I started to suspect that over time KGroupedStream starts having millions of values per unique key, something like that. Zero exceptions in logs, GC is fine; 32 partitions, 32 Processor tasks / consumers; etc.

Thanks,

Perf tuning is always difficult. Did you look into metrics (Monitor Kafka Streams Applications in Confluent Platform | Confluent Documentation) and other OS stats (eg, do you get less efficient disk I/O after time?)

Does your key space grow?

Did you try to look into RocksDB and try tuning it? Kafka Streams Memory Management for Confluent Platform | Confluent Documentation

Hi Matthias,

I did only initial performance adjustments targeting low latency: enabling caching for Kafka Streams, auto-commit each 100ms; reached easily 60Mb/s Ingress for single dedicated CKU which is max, 5000 messages per second, 150ms-200ms average latency for first 1-2 hours. Then it start to grows to minutes to hours… with 30Mb lag. So that I started to suspect maybe I use “aggregation” for wrong use case. I used Kafka Streams 3.6.2, now upgraded to 3.8.0.

My assumption is, that, this algorithm should pick V1 from Topic, and “latest greatest” V2 from Statestore, and compare + aggregate new value to statestore. This performance degradation was unexpectedly strange so I posted message her. No any memory issues, CPU, I am monitoring GC.

So let me check local RocksDB, thanks for suggestion.

Keyspace (cardinallity of different keys) is 128M, I am running messages simulator and keys are randomly generated, 128M possible values.
Even when I run with low load, 1000 messages per second (instead of 5000), latency gradually increases over time; I’d understand if database is slow and it takes 10-20 seconds to execute query, but over one hour? so I was kinda shocked I can imagine single scenario only when KGroupedStream really tries to reduce whole historical collection (instead of comparing “aggregate” with “new value”)

One shocking observation:

when I restart Kafka Streams, it takes 5+ hours to start! It accumulated 300Gb data in “REDUCE-STATE-STORE-…-changelog”, and it can download only 15Mb/s (32 partitions, 32 threads). Why it tries to download whole thing? RocksDB is not corrupted (I hope; although I killed process by kill -9 <PID>). I am not sure.

My assumption is, that, this algorithm should pick V1 from Topic, and “latest greatest” V2 from Statestore, and compare + aggregate new value to statestore.

Yes.

This performance degradation was unexpectedly strange so I posted message he

Agreed. Something is off for sure.

I am not 100% sure, but maybe RocksDB compaction could be triggered which is a blocking operation (it’s not more than a wild guess at this point though…), after a certain number of keys was inserted.

Maybe a heap dump can tell you where it’s blocking to confirm this idea?

Keyspace (cardinallity of different keys) is 128M, I am running messages simulator and keys are randomly generated, 128M possible values.

Do you observe the same issue when you reduce the key space?

when I restart Kafka Streams, it takes 5+ hours to start! It accumulated 300Gb data in “REDUCE-STATE-STORE-…-changelog”, and it can download only 15Mb/s (32 partitions, 32 threads). Why it tries to download whole thing? RocksDB is not corrupted (I hope; although I killed process by kill -9 <PID> ). I am not sure.

Well, kill -9 could corrupt RocksDB. If you are using exactly-once guarantees, even if RocksDB itself is not corrupted, KS would lose track about what is in RocksDB and would need to rebuild it from scratch on restart to guarantee correctness.

You should in general never kill -9 but trigger a clean shutdown, calling KafkaStreams#close() and using a shutdown hook: Write a Kafka Streams Application for Confluent Platform | Confluent Documentation

Also cf: How to build your first Apache Kafka Streams application using Confluent

Also, one possible scenario is that I was running 2 JVMs, num.stream.threads: 16 for 32 partitions, and accidental rebalance may cause new BricksDB-per-partition being rebuilt from scratch. So I run test now with single JVM, 32 threads.

I upgraded JARs to 3.8.0; explicitly configured RocksDB with larger caches. Will know tomorrow morning only if performance degrades. And I’ll retest with 10x reduced keyspace too.

128M keys, 2K message size, so I expect statestore max size will be 256Gb.
“disaster recovery” (full rebuild of statestore) with 180MB/s Egress will take in theory 25 minutes (single CKU)

Hi Matthias,

Thank you very much for the help, I sincerely appreciate your comments and suggestions!

My update: performance is stable during 8 hours run, no degradation, 4,000 TPS, average latency less than 100ms, min 15ms, max 2500ms. I am happy with results :wink: (considering it goes to the cloud encrypted). Single dedicated CKU.

I believe explicitly configuring RocksDS was the key; and I also upgraded Kafka client libraries from v.3.6.2 to 3.8.0.

1 Like

Ops after 9 hours of run (after “changelog” topic became 160Gb) it started smooth degradation. I believe I am on the right path, will do more monitoring & tuning; super important is confirmation that GroupedStream/Reduce is applicable to the described bitemporal use case and the issue is with performance tuning.
In about 30 minutes performance “smoothly” degraded to 90 seconds average latency. I’ll run more isolated tests (with lower TPS, single partition, to avoid “rebalancing”); will figure it out. Most probably RocksDB intrernals.
Thanks,

1 Like

So I stopped “producer” (random message generator), waited for an hour, and started again. Latency was 1-2 minutes and slowly going up. For sure all systems were stable (Garbage Collection, RocksDB optimizations, rebalancing, etc: one hour of zero-load “rest time!”). The issue could as trivial as “not enough disk space”, and etc.; so, “we are good,” need just finalize findings for proper capacity planning (hardware, libjemalloc.so, number of CKUs, etc.)
Thank you!

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.