Help tune performance: 8 seconds latency Kafka Streams

Hello,

I am working on trivial financial application, bitemporal data flow, each record has “marketTime” and “correctionTime”, and “business key”.

I use very simple Kafka Streams app, which does this: groupByKey and reduce (by comparing timestamps and picking latest record).

GlobalKTable created at the end which stores “latest” data for “business key”.

I run stress test using 1 record per minute, and it takes 8 seconds from end to end?
I run 1,000 records per second and it takes about 17 seconds average latency per message. Scalability is excellent; processor run in single JVM, 8 threads, 128 partitions.

But why single message per minute takes 8 seconds, with Confluent Cloud? Average message size is 2kb. I frankly expected 500ms-700ms (time it takes for Google Cloud Functions to update remote database, as an example).

I tuned acks: 1 and min.insync.replicas: 1for Producer, but I can’t find similar settings for Processor.

Throughput is excellent, but latency?

8 seconds latency with zero-like load (single message per minute)?

Hard to say with the information provided, but I would assume it might be related to caching? Kafka Streams Memory Management for Confluent Platform | Confluent Documentation

Can you try to disable caching by setting cache size to zero?

I found why this happens this is really witty:

Producer emits Record at 11:00:00, 11:01:00, 11:02:00, 11:03:00
Then we have Kafka Streams, and Consumer for final aggregate records
Consumer receives records at 11:00:08, 11:01:08, 11:02:08, 11:03:08.

It happened after I decided to isolate issue and go with default settings with newly created empty topics. Confluent Cloud, Dedicated CKU.

Now, this is funny: by default, commit.intrval.ms=30000 (3o seconds), so that Consumer will (accidentally) poll records at 11:00:08, 11:00:38, 11:01:08, 11:01:38 11:02:08, 11:02:38 11:03:08, 11:03:38. 8 seconds “accidental” latency.

After finding this I reduced commit.intrval.ms=100 and polls.ms=10 and got excellent results: producer generates 4,000 messages per second, and I have average latency 160ms with max latency 700ms during long run.

And finally I “hit the wall”: Dedicated CKU allows 15,000 requests per second, I was doing near 4,000 (Producer) + 8,000 (Streams Processor) + 4,000 (Consumer).

So these numbers are good considering that network roundtrip will take 50ms.

I know, I can increase throughput by batching (and I was researching how to batch Streams Processor…) → but latency drastically degrades; for my use case easier to add more CKUs if we need more throughput.

Your description actually aligns with my theory. However, your understanding is not totally correct.

The consumer does poll data all the time, however, because of caching, results might be hold back. On commit, caches are flushed, and thus you observe the results now.

Reducing commit interval does help, but the change is not on the “input path” (as you suspected) but on the “output path”. Disabling caching would give a similar result as reducing commit interval. – I also think that poll.ms should not be too relevant, because it’s the time poll() would block if there is no data – reducing it should not really improve your latency.

I am just following up to make sure you understand how Kafka Streams works, so you can make the right config changes. Committing offsets has some overhead, and thus, I am not sure if your really want to reduce it to 100ms. Disabling caching might be a better option for your use case.

I know, I can increase throughput by batching (and I was researching how to batch Streams Processor…) → but latency drastically degrades; for my use case easier to add more CKUs if we need more throughput.

There is many way how you can play with this… caching is some form of “batching” if you wish (even if it’s more a de-duplication of updates to the same key). Doing some caching would actually reduce the number of writes, and help you to avoid hitting the request limits. Producer linger.ms is also an interesting one… HTH.

Thanks for the suggestion! Yes, current config uses state-store-cache-max-size: 2147483647 setting. I was also wondering why it uses local BricksDB (my code doesn’t explicitly uses Materialized):
.groupByKey().reduce(...).toStream().to("outputTopic")

I am trying now with disabled cache. Yes I know unnecessary auto-commits (even if there is nothing to commit) using 64 partitions is not efficient; I am trying now just to disable cache and check if I can go over 5000 messages per second. I tried to use “linger” and “batch” for producer but latency instantly goes up 20x times (including huge lag) - I assume I can use 100 batches per second for Producer (100,000 records per second) and Streams cannot handle that (15,000 requests per second is limit; how to have batching for Streams?)

There are many surprises which I don’t understand…

I think I understand more now: caching for Streams effectively acts as batching, and commit.interval.ms plays role when caching is enabled. So my approach is good then; Processor commits batches each 100ms, minimizing [or maximizing ;] “requests per second”.
Streams API also uses repartitioning topics and many other topics so maybe sending single message end-to-end requires more than 4 network hops: Producer → Processor (Pick Message) → Processor (Repartition) → … → Consumer.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.