Commit offsets with a new KafkaConsumer object in python

Requirement

I need to periodically (once a day) fetch messages from a Kafka topic and write all these messages to a Slack channel.

Design

I am using Temporal, an activity-workflow based model that implements resiliency. I have modeled three activities:

  1. Fetch Messages: Fetch a batch of messages from the Kafka topic.
  2. Post to Slack: Post each message to Slack (this activity is called for each message).
  3. Commit Offsets: Commit the offsets to Kafka so that the next read starts from the committed offset.

The main point here is that the above three activities are completely independent.

Problem Statement

The issue I’m encountering is that the activity responsible for committing offsets runs into the following error:

swift

Copy code

{"message":"CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the rebalance timeout with max_poll_interval_ms, or by reducing the maximum size of batches returned in poll() with max_poll_records.","source":"","stackTrace":"  File \"/path/to/file\", line 447, in _run_activity\n    result = await impl.execute_activity(input)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/path/to/file\", line 703, in execute_activity\n    return await input.fn(*input.args)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/path/to/file\", line 140, in commit_kafka_offsets\n    consumer.commit(offsets=offsets)\n\n  File \"/path/to/file\", line 527, in commit\n    self._coordinator.commit_offsets_sync(offsets)\n\n  File \"/path/to/file\", line 521, in commit_offsets_sync\n    raise future.exception # pylint: disable-msg=raising-bad-type\n    ^^^^^^^^^^^^^^^^^^^^^^\n","encodedAttributes":null,"cause":null,"applicationFailureInfo":{"type":"CommitFailedError","nonRetryable":false,"details":null}}

I tried setting max_poll_interval_ms to a very high value, but that doesn’t fix the issue. The batches I am processing are small enough that I cannot optimize them further.

Questions

  1. How can I commit the offsets to Kafka effectively with this design?

I can provide the code in Python if that helps.

Which Kafka Python library are you using?

Do you have just one consumer in the consumer group? I don’t know Temporal – is there a new consumer created to commit offsets?

Sure, couldn’t hurt to see the basic poll / commit loop.

“kafka-python” is the library.

Poll loop:


    @activity.defn
    async def fetch_kafka_messages(self, topic: str, group_id: str, batch_size: int) -> Tuple[List[str], List[Dict[str, Any]]]:
        logging.basicConfig(level=logging.DEBUG)

        print("Starting Kafka consumer...")
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=['kafka-cluster-kafka-0.kafka-cluster-kafka-brokers:9092'],
            group_id=group_id,
            auto_offset_reset='earliest',  # Start from the earliest messages if no offset is found
            enable_auto_commit=False,  # Disable auto commit
            value_deserializer=lambda x: x.decode('utf-8'),
            max_poll_interval_ms=6000000
        )

        print("Kafka consumer started.")
        messages = []
        partitions_to_commit = []
        total_messages_fetched = 0

        while total_messages_fetched < batch_size:
            msg_pack = consumer.poll(timeout_ms=1000)  # Poll for 1 second

            if not msg_pack:
                break

            for tp, messages_in_partition in msg_pack.items():
                for message in messages_in_partition:
                    print(f"Received message: {message.value}")
                    messages.append(message.value)
                    total_messages_fetched += 1
                    partitions_to_commit.append({
                        'topic': tp.topic,
                        'partition': tp.partition,
                        'offset': message.offset
                    })
                    if total_messages_fetched >= batch_size:
                        break
                if total_messages_fetched >= batch_size:
                    break

        print("Exiting Kafka consumer.")
        return messages, partitions_to_commit

Commit Loop:

@activity.defn
    async def commit_kafka_offsets(self, topic: str, group_id: str, partitions_to_commit: List[Dict[str, Any]]):
        print("Starting Kafka consumer...")
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=['kafka-cluster-kafka-0.kafka-cluster-kafka-brokers:9092'],
            group_id=group_id,
            auto_offset_reset='earliest',  # Start from the earliest messages if no offset is found
            enable_auto_commit=False,  # Disable auto commit
            value_deserializer=lambda x: x.decode('utf-8'),
            max_poll_interval_ms=6000000
        )

        try:
    # consumer.commit(offsets={TopicPartition(offset['topic'], offset['partition']): OffsetAndMetadata(offset['offset'] + 1, None) for offset in partitions_to_commit})
            # consumer.close()
            print("Committing offsets...")
            offsets = {
                TopicPartition(offset['topic'], offset['partition']): OffsetAndMetadata(offset['offset'] + 1, None)
                for offset in partitions_to_commit
            }
            consumer.commit(offsets=offsets)

        finally:
            consumer.close()

You might try explicitly assigning partitions and seeking in the commit_kafka_offsets function. I.e., don’t provide the topic when instantiating the consumer, and then do:

consumer.assign([TopicPartition(offset['topic'], offset['partition']) for offset in partitions_to_commit])

for offset in partitions_to_commit:
  consumer.seek(TopicPartition(offset['topic'], offset['partition']), offset['offset'] + 1)

# no arg will commit current in-memory offsets
consumer.commit()

There’s some relevant documentation in the Storing Offsets Outside Kafka section here. While you’re not storing offsets outside Kafka, I believe that the advice about seeking and assigning partitions applies in this situation.

That works like a charm. Thanks!

1 Like