Python Kafka Consumer Script Running Continuously Without Output

Hi Kafka community,

I’m working on a Python script to consume messages from a Kafka topic, and I’m facing an issue where the script runs continuously without producing any output. I’m able to see messages using the Kafka console consumer, but the Python script doesn’t seem to be working as expected.

from confluent_kafka import Consumer, KafkaError
import requests
import json

# Kafka configuration
kafka_bootstrap_servers = '172.x.x.x:9092'
kafka_topic = 'test-topic'
kafka_group_id = 'test-consumer-group'

# REST API configuration
api_url = 'https://172.x.x.x/api/v1/webhooks/cnas/test-topics?st2-api-key=ODFjZGJkNzRjZDQ5MDk1ZDljMzMxYTk5MzE5OWQ0YzU3N2ZmODBlZGViOWRiMjBjZDMxM2Q1OGNjYWQyNTgxYAB'

# Create Kafka consumer
consumer_conf = {
    'bootstrap.servers': kafka_bootstrap_servers,
    'group.id': kafka_group_id,
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe([kafka_topic])



try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Print received Kafka message
        print(f"Received Kafka message: {msg.value().decode('utf-8')}")

# Process the received message
        try:
            message_value = json.loads(msg.value().decode('utf-8'))

            # Trigger REST API with the message content
            response = requests.post(api_url, json=message_value, verify=False)

            print(f"API response: {response.text}")

 
            if response.status_code in [202, 200]:
                print(f"API call successful for message: {message_value}")
                consumer.commit()  # Manually commit offset
            else:
                print(f"API call failed with status code {response.status_code}")

        except Exception as e:
            print(f"Error processing message: {e}")
except KeyboardInterrupt:
    pass


finally:
    consumer.close()

Configuration Details:
Kafka Version: 3.5.0
Consumer Group ID: ‘test-consumer-group’
Kafka Bootstrap Servers: ‘172.x.x.x:9092’
Kafka Topic: ‘test-topic’

Observations:

  • Checked Kafka logs for errors (no relevant errors found).
  • Confirmed that the topic ‘test-topic’ exists using the Kafka console consumer.
  • Tried modifying the script and experimented with different configurations.

What could be the reasons for a Python Kafka consumer script running continuously without producing output?
Are there any specific configurations or settings I should check in my script or Kafka setup?

Additional Context:

  • Using Confluent Kafka Python library version 2.3.0
  • The script worked previously, but the issue appeared after the VM restarted and the associated services Zookeeper and Kafka restarted, before that I was able to view the response code of the API triggered.