I have a KafkaConsumer that is stuck in a “joining group” loop.
It’s a simple Kafka Python consumer, here is the python script:
from kafka import KafkaConsumer
import logging
import sys
root = logging.getLogger('kafka')
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
bootstrap_servers_sasl = ['node1.dev.company.local:9092', 'node2.dev.company.local:9092', 'node3.dev.company.local:9092']
topicName = 'test_sasl'
consumer = KafkaConsumer(
topicName,
bootstrap_servers = bootstrap_servers_sasl,
security_protocol = 'SASL_PLAINTEXT',
sasl_mechanism = 'SCRAM-SHA-512',
sasl_plain_username = 'test_user',
sasl_plain_password = 't3st_us3r',
group_id = 'test_group'
)
try:
for message in consumer:
if message:
print(f"Received message: {message.value.decode('utf-8')}")
except Exception as e:
print(f"An exception occurred: {e}")
finally:
consumer.close()
When I include group_id when creating the KafkaConsumer, I will see this in the log over and over again forever, and the consumer will never actually see an item published to the topic it is supposed to be monitoring.
2023-09-13 08:35:44,102 - kafka.cluster - INFO - Group coordinator for test_group is BrokerMetadata(nodeId='coordinator-0', host='node1.dev.company.local', port=9092, rack=None)
2023-09-13 08:35:44,102 - kafka.coordinator - INFO - Discovered coordinator coordinator-0 for group test_group
2023-09-13 08:35:44,102 - kafka.coordinator - INFO - (Re-)joining group test_group
2023-09-13 08:35:44,104 - kafka.coordinator - WARNING - Marking the coordinator dead (node coordinator-0) for group test_group: [Error 16] NotCoordinatorForGroupError.
If I don’t include group_id everything works fine.
The Kafka brokers are Confluent Kafka, version 7.4.1-ccs.
The controller.log on the Kafka server side contains this:
[2023-09-13 08:56:03,345] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController)
I’m not sure whether that is indicative of a problem, or just general information.
Here is the server.properties file (from node1, there are three nodes in total):
broker.id=0
listeners=LISTENER_ONE://:9092,LISTENER_TWO://:9096
inter.broker.listener.name=LISTENER_ONE
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=true
security.protocol=SASL_PLAINTEXT
advertised.listeners=LISTENER_ONE://node1.dev.company.local:9092,LISTENER_TWO://node1.dev.company.local:9096
listener.security.protocol.map=LISTENER_ONE:SASL_PLAINTEXT,LISTENER_TWO:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1.dev.company.local:2181,node2.dev.company.local:2181,node3.dev.company.local:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3
auto.create.topics.enable=false
inter.broker.protocol.version=3.4-IV0
I also ran kafka-console-consumer from one of the broker machines to test:
kafka-console-consumer --bootstrap-server node1.dev.company.local:9092,node2.dev.company.local:9092,node3.dev.company.local:9092 --group test_group --topic test_sasl --consumer.config consumer.config
With the following in consumer.config:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="test_user" \
password="t3st_us3r";
I don’t get any errors, but also no indication that it is processing any of the test messages I produce (although when I run it without --group, I also don’t get any indication that it is processing messages…this is different than what I am seeing with the Kafka Python client).
I ran the following command:
kafka-topics --bootstrap-server node1.dev.company.local:9092,node2.dev.company.local:9092,node3.dev.company.local:9092 --describe --topic test_sasl --command-config admin-plaintext.config
Where admin-plaintext.config contains this:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin-username" \
password="admin-password";
The response is:
Topic: test_sasl TopicId: Y3hiju-ZQsOvdRgLk4vpZw PartitionCount: 1 ReplicationFactor: 2 Configs: cleanup.policy=delete,segment.bytes=1073741824,retention.ms=86400000,unclean.leader.election.enable=true
Topic: test_sasl Partition: 0 Leader: 2 Replicas: 2,0 Isr: 0,2
I can provide any other information, but am not sure what would be the most useful information to include.