Very slow KTable-KTable foreign key join performance

I’ve got 3 topics:

  • topicA: 50k records
  • topicB: 500k records
  • topicC: 20k records

They’re consumed as KTables and joined via a foreign key join, where this key is a regular property in topicA and the key in topicB and topicC:
topicA join topicB leftJoin topicC

These joins result in:

  • 50k records in a KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic
  • 2 million records in a KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic
  • 2 million records in a KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000017-topic and
  • 2 million records in a KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000032-topic

Now these aren’t serious amounts for Kafka, but still, it takes 4 hours for the stream application to complete. Neither CPU, nor memory is the bottleneck, as the transaction rate is a mere few dozen to a few hundred records per second.

This is reproducible and has also been posted on stack overflow by others, too: Kafka Streams KTable-KTable non key join performance on skewed tables - Stack Overflow.

Other kinds of joins, such as KStream-GlobalKTable or KStream-KTable don’t have this problem.

What can be done to improve the throughput?

Hard to say. Perf issues are always difficult to triage.

Not sure if I fully understand the scenario yet.

  • 50k records in a KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic
  • 2 million records in a KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic

Seems there is some typo. Is this 50K or 2M? I assume it’s for the first A join B part?

as the transaction rate is a mere few dozen to a few hundred records per second.

That sound low. KS should be able to do 10K+ message/sec for a FK join (of course depending on hardware etc…)

From SO:

More specifically, the speed of emitting responses is proportional to the merchants changes, while the speed of materializing non-key join results into the changelog is limited.

Not sure if I understand what you mean here?

why consuming KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE is slow and limited to some bound

Again. Hard to say. In general, for a FK-join, there will be two parallel tasks: t1 processing left-table (merchant in your case) input record, plus everything from the -response topic. This task will also write into the -subscription topic (of course, for each input record, there should be exactly one subscription). For each processed response, we need to do a lookup into the left-input table to compute the join result calling the ValueJoiner. – t2 is processing the right input table and the -subscription topic. There are two stores, and for each input record for both topic, a lookup (point or range) is done on the other store, and the result (pre-join) is send back via the -response topic.

Consuming from the response topic is coupled with reading input and writing to subscription topic, and it all happens on the same threads. On the other hand, I would expect the range scans in t2 to be the most expensive part, not consuming the response topic.

Other kinds of joins, such as KStream-GlobalKTable or KStream-KTable don’t have this problem.

Those work way differently internally. But perf gap should not be as large as you describe.

My solution was the strict separation of deployment and operation of the software, so that different runtime properties could be set among others for Kafka. The most important setting however was acks: 1 instead of the default acks: all during deployment resulting in an improvement of an order of magnitude . Although only three brokers are part of the cluster it was a game-changer; I couldn’t even come near with any other tuning attempts. Thus, the deployments now only take 10 minutes instead of several hours.

ack has for sure an impact. But note, with ack=1 there is a risk of data loss, because the producer moves on after the first broker (ie, leader) did receive the data, but data is not replicated yet. Data loss could happen, if the leader broker fails before data is replicated. It’s not a setting recommended for production.

I am not a broker expert, but there are also ways to tune the brokers and to make replication faster. Maybe this would be the route to go, instead of trading off correctness guarantees for performance.

I’d have been happy to go another way, but there is no viable alternative currently regarding performance. And as I mentioned, this setting only applies to the short deployment phase where the monitoring would immediately recognise a failure, so that the deployment could be repeated. Afterwards ack: all will be set.