Hello Team
my confluent kafka cluster is configured in cloud . when i am trying to connect with below details
‘bootstrap.servers’: “cluster bootstapserver”,
‘sasl.mechanisms’: PLAIN,
‘security.protocol’: SASL_SSL
‘sasl.username’: cluster apikey,
‘sasl.password’:cluster PASSWORD,
‘group.id’: str(uuid.uuid1()), # this will create a new consumer group on each invocation.
‘auto.offset.reset’: ‘earliest’,
‘enable.auto.commit’: ‘false’,
‘max.poll.interval.ms’: ‘86400000’,
consumer = Consumer(consumer_config)
consumer.subscribe([Settings.kafka.KAFKA_TOPIC])
logger.info(f"Subscribed to Topic-{ Settings.kafka.KAFKA_TOPIC}")
try:
while True:
# print(“Listening”)
# read single message at a time
msg = consumer.poll(0.1) # Wait for message or event/error
if msg is None:
sleep(5)
continue
if msg.error():
logger.error(“Error reading message : {}”.format(msg.error()))
continue
processrequest(msg.key(), msg.topic(), msg.value())
consumer.commit()
except Exception as ex:
logger.error("Kafka Exception : {}", ex)
consumer.close()
finally:
logger.info("closing consumer")
consumer.close()
in confluent cluster ,i dont have any consumer permission .
when i am trying to connect , i am getting error as below:
Error reading message : KafkaError{code=GROUP_AUTHORIZATION_FAILED,val=30,str=“FindCoordinator response error: Group authorization failed.”}
is it required any consumer permission needs to enable at confluent cluster side ? if yes let me know what permission i need to enable/ add
Thanks in advance