Hi Kafka community,
I’m working on a Python script to consume messages from a Kafka topic, and I’m facing an issue where the script runs continuously without producing any output. I’m able to see messages using the Kafka console consumer, but the Python script doesn’t seem to be working as expected.
from confluent_kafka import Consumer, KafkaError
import requests
import json
# Kafka configuration
kafka_bootstrap_servers = '172.x.x.x:9092'
kafka_topic = 'test-topic'
kafka_group_id = 'test-consumer-group'
# REST API configuration
api_url = 'https://172.x.x.x/api/v1/webhooks/cnas/test-topics?st2-api-key=ODFjZGJkNzRjZDQ5MDk1ZDljMzMxYTk5MzE5OWQ0YzU3N2ZmODBlZGViOWRiMjBjZDMxM2Q1OGNjYWQyNTgxYAB'
# Create Kafka consumer
consumer_conf = {
'bootstrap.servers': kafka_bootstrap_servers,
'group.id': kafka_group_id,
'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe([kafka_topic])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
continue
else:
print(msg.error())
break
# Print received Kafka message
print(f"Received Kafka message: {msg.value().decode('utf-8')}")
# Process the received message
try:
message_value = json.loads(msg.value().decode('utf-8'))
# Trigger REST API with the message content
response = requests.post(api_url, json=message_value, verify=False)
print(f"API response: {response.text}")
if response.status_code in [202, 200]:
print(f"API call successful for message: {message_value}")
consumer.commit() # Manually commit offset
else:
print(f"API call failed with status code {response.status_code}")
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
Configuration Details:
Kafka Version: 3.5.0
Consumer Group ID: ‘test-consumer-group’
Kafka Bootstrap Servers: ‘172.x.x.x:9092’
Kafka Topic: ‘test-topic’
Observations:
- Checked Kafka logs for errors (no relevant errors found).
- Confirmed that the topic ‘test-topic’ exists using the Kafka console consumer.
- Tried modifying the script and experimented with different configurations.
What could be the reasons for a Python Kafka consumer script running continuously without producing output?
Are there any specific configurations or settings I should check in my script or Kafka setup?
Additional Context:
- Using Confluent Kafka Python library version 2.3.0
- The script worked previously, but the issue appeared after the VM restarted and the associated services Zookeeper and Kafka restarted, before that I was able to view the response code of the API triggered.