I’m trying to create a Kafka Producer inside a Lambda Function with Exactly-Once Delivery support enabled to push messages to MSK.
Edit: MSK IAM Auth is used for security protocol between Kafka and clients
However, even though (I think) I’ve setup all the configurations correctly, Producer still can’t write messages to MSK.
Producer hangs upon calling init_transactions() and outputs the following debug messages in a loop:
%7|1708348719.300|TXNCOORD|lambda#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (3 broker(s) known)
2024-02-19T14:18:39.338+01:00 %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker in state TRY_CONNECT connecting
2024-02-19T14:18:39.338+01:00 %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker has no address yet: postponing connect
2024-02-19T14:18:39.800+01:00 %7|1708348719.800|CONNECT|lambda#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID
2024-02-19T14:18:39.800+01:00 %7|1708348719.800|PIDBROKER|lambda#producer-1| [thrd:main]: No brokers available for Transactions (3 broker(s) known)
I have tried changing the number of brokers (from 2 to 4 - did not work), playing around with values for the settings transaction.state.log.replication.factor, transaction.state.log.min.isr, offsets.topic.replication.factor (even setting them all to 1 - did not help).
Advice from this thread did not help either Problems with Amazon MSK default configuration and publishing with transactions
I have the following configs and settings for AWS MSK Cluster:
- 4 Brokers across 2 availability zones
- Kafka 2.8.1
- Cluster size: kafka.t3.small (I`ve checked and the same occurs on kafka.m5.large)
Cluster Configs:
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
min.insync.replicas=2
default.replication.factor=3
auto.create.topics.enable=true
num.io.threads=8
num.network.threads=2
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.session.timeout.ms=18000
Producer Configs:
- Using Confluent Kafka for Python to create the Producer
{
"client.id": "some_id",
"acks": "all",
"enable.idempotence": "true",
"transactional.id": "123",
}
P.S. If I don’t use the acks, enable.idempotence and transactional.id - Producer works fine, but it beats the purpose of raising this issue in the first place.
UPD: After digging through logs of MSK brokers, it seems that the connection is not getting established - no idea why, especially because the auth method is the same for transactional and non-transactional Producer and for non-transactional Producer the connection is getting established just fine.