Kafka Consumer Offset

Hi all,

we are using package named “Confluent kafka” in python code to consume the data from kafka topics. after we are consuming the data from kafka topics and processing it for our needs, we are committing the offset manually. Normally, offset is committed automatically but we are controlling the committing offset because after consuming the data if it fails in processing or transforming the data, we will not commit the offset.

With this approach, we are seeing offset is not getting committed sometimes but it supposed to commit. Can you please suggest on this and confirm if this is the best practice to commit offset manually.

Are you committing synchronously or async? Could you share a code snippet?

It’s really a use case-dependent tradeoff decision rather than a best practice. You’ll get granular control to prevent or minimize duplicates, at some code complexity and (potential) performance cost.

Hi @dtroiano,

This is the code snippet which we are using to commit offset manually

from confluent_kafka import Consumer

configuring Kafka consumer

consumer_config = {
‘bootstrap.servers’: ‘xxxx.xxxx.xxx:9092’,
‘group.id’: ‘xyz’,
‘default.topic.config’: {
‘auto.offset.reset’: ‘earliest’
},
‘enable.auto.commit’: False
}
all_topic_consumer = Consumer(**consumer_config)

subscribing to the kafka topic

all_topic_consumer.subscribe([‘topic_x’])

consuming topic messages

all_topics_batch = all_topic_consumer.consume(
num_messages=10000,
timeout=30)

doing some operations on consumed messeges

if len(all_topics_batch) > 0:
#Operations
pass

commiting the consumer

all_topic_consumer.commit()

Please check and let us know if there are any changes to be done on the code level

Since you’re committing async, try adding a commit callback to see if any errors crop up in the callback. For example:

def on_commit(err, partitions):
    if err is not None:
        print("err is " + str(err))
    else:
        print("no error on commit, committed: " + str(partitions))

And add it to your consumer config:

consumer_config = {
    'bootstrap.servers': 'xxxx.xxxx.xxx:9092',
    'group.id': 'xyz',
    'default.topic.config': {
        'auto.offset.reset': 'earliest'
    },
    'enable.auto.commit': False,
    'on_commit': on_commit
}

Hi @dtroiano,

i have tried the code and captured the error message in which it is showing “Exceeded max.poll.interval”

I consume messages from a topic, process them, and commit the offsets after processing is complete. However, sometimes the message processing takes longer than the max.poll.interval.ms, which causes the consumer to be considered idle or dead, leading to rebalancing in the consumer group.

Can you suggest any solutions to keep the consumer alive and avoid this issue?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.