Hi, We are using open source Confluent Kafka deployed in Kubernetes. We have less consumers than the partitions and as such we have multiple Kafka partitions assigned to each consumer pod. We observed some of these partitions have higher lag than the other, seems the consumer is prioritizing some partition fetch over other. Our messages are different in sizes and we have set max bytes per message and overall partition fetch size too. How can we ensure to process Round Robin mode in Kafka Consumer with the partitions assigned to it?
I am aware of Partition assignment strategies but this is different as it is related to reading messages from partition after a partition is assigned to the consumer. I am trying to understand on what basis a Kafka consumer reads messages from its assigned partitions and how can we better ensure fairness among its partitions ?
I am trying to understand on what basis a Kafka consumer reads messages from its assigned partitions
This is going to be a two-part answer. Foremost, how Kafka assigns messages to partitions depends on the assignor configured in the consumer. If you changed nothing, most likely you are using the default assignor, which has some sort of stickiness logic to it. The source of confusion for many people regarding this is that in the past, Kafka used to default to RangeAssignor, which gives each consumer a fair share of the data streams stored in the partitions. However, recent versions of Kafka changed the default assignor to the sticky version, where certain consumer threads are pinned to certain partitions to improve throughout and avoid the problems caused by redistributing the consumers because of rebalancing. For this reason, if you need to change the behavior in how Kafka assign data streams to the partitions, change your assignor strategy.
Also, keep in mind that Kafka uses a pull model, not push. This means that when messages are read from the partitions depends entirely on the consumer. They are the ones that perform a poll() invocation to fetch new records — just like if Kafka was a database. So you may need to review the consumer code to see how the polling is occurring. Usually, the consumer code is written to spin up a background thread that keeps doing a continuous polling by invoking this code in a never stops loop.
and how can we better ensure fairness among its partitions?
By changing how the consumers fetch new messages from Kafka. As mentioned before, message assignment and consumption are the responsibility of the assignor and the consumers. Kafka is just a dump pipe — it doesn’t process nothing. It just serves data to whoever subscribes the partitions. I wrote a blog post that discusses this idea and introduces a design pattern called Bucket Priority that you may be interested. The implementation of this design pattern can be found here.
@riferrei Thanks for the details. Our requirement is slightly different. Thanks for your blog on Bucket Priority. However, I have a slight different requirement. If 2 partitions are assigned to the same Consumer and how does that consumer read messages from each partition equally without adding lag on any of them. As of now our consumer when we do a poll() reads from one consumer completely (as reported by Grafana) then reads from the next partition, especially when we re-start the consumer.
Based on your comment we suspected the problem was due to sticky default assignor and we changed it to round robin assignor and there was no change in behavior.
What you are describing seems to be the normal behavior of the Kafka consumer regardless of the assignor configured. Assignors only handle which partitions will be assigned to the consumer threads — but it is up to each thread when to fetch records from the partitions. On each poll, the consumer thread will fetch records from one partition sequentially, starting from the last committed offset from that consumer group. Once it finishes with that partition, then it will start fetching from the next partition — if there is one assigned to that consumer thread as well.
This behavior is because of how Kafka organizes data, which is based on the immutable commit log data structure. The consumer thread doesn’t “load balance” between partitions during the poll() call. It would be too expensive from the network standpoint, as partitions may live in different brokers. It will try to fetch as many records as possible from one partition before jumping into the other. The code below has the internal method that is invoked because of a poll.
In order to achieve fine control over how the polling fetches records from multiple partitions concurrently, you many need to change your consumer thread code to:
Assign partitions manually.
Use primitives like seek(), pause(), resume(), and poll() to load balance records consumption from partitions.
Manually commit the offsets.
You may need to review how long it takes to process each record. As they need to be fully processed before committing the offset, the time that takes to process a record directly affects how many records can be processed on each poll without causing delays.
An easy way to avoid all this mess is using an assignment strategy that delivers each partition for a unique consumer thread — pod, in your case. Then, naturally, all partitions will have their records fetched concurrently as they are being processed by dedicated client threads.