MSK Kafka Transactions Support

I’m trying to create a Kafka Producer inside a Lambda Function with Exactly-Once Delivery support enabled to push messages to MSK.

Edit: MSK IAM Auth is used for security protocol between Kafka and clients

However, even though (I think) I’ve setup all the configurations correctly, Producer still can’t write messages to MSK.

Producer hangs upon calling init_transactions() and outputs the following debug messages in a loop:

%7|1708348719.300|TXNCOORD|lambda#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (3 broker(s) known)

2024-02-19T14:18:39.338+01:00	%7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker in state TRY_CONNECT connecting

2024-02-19T14:18:39.338+01:00	%7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker has no address yet: postponing connect

2024-02-19T14:18:39.800+01:00	%7|1708348719.800|CONNECT|lambda#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID

2024-02-19T14:18:39.800+01:00	%7|1708348719.800|PIDBROKER|lambda#producer-1| [thrd:main]: No brokers available for Transactions (3 broker(s) known)

I have tried changing the number of brokers (from 2 to 4 - did not work), playing around with values for the settings transaction.state.log.replication.factor, transaction.state.log.min.isr, offsets.topic.replication.factor (even setting them all to 1 - did not help).
Advice from this thread did not help either Problems with Amazon MSK default configuration and publishing with transactions

I have the following configs and settings for AWS MSK Cluster:

  • 4 Brokers across 2 availability zones
  • Kafka 2.8.1
  • Cluster size: kafka.t3.small (I`ve checked and the same occurs on kafka.m5.large)

Cluster Configs:

transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
min.insync.replicas=2
default.replication.factor=3
auto.create.topics.enable=true
num.io.threads=8
num.network.threads=2
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.session.timeout.ms=18000

Producer Configs:

  • Using Confluent Kafka for Python to create the Producer
{
  "client.id": "some_id",
  "acks": "all",
  "enable.idempotence": "true",
  "transactional.id": "123",
}

P.S. If I don’t use the acks, enable.idempotence and transactional.id - Producer works fine, but it beats the purpose of raising this issue in the first place.

UPD: After digging through logs of MSK brokers, it seems that the connection is not getting established - no idea why, especially because the auth method is the same for transactional and non-transactional Producer and for non-transactional Producer the connection is getting established just fine.

I’d recommend testing the transactional producer on an unauthenticated cluster (leave all other cluster configs the same) so that you can rule security config in or out as the area to debug.

Hi, yep, seems like the issue is with AWS IAM Auth for MSK. Not sure exactly what though, because with IAM Auth and no transactions - everything works just fine, but IAM Auth + Transactions = No connection even attempted to be established on the broker side.
P.S. Trying to establish IAM Auth via Python and aws-msk-iam-sasl-signer library

Possibly it has to do with the authorization policy attached to the role – there are some actions that need to be allowed for transactions to work.

Nope, in order to test this, I`ve given kafka-cluster:* and kafka:* permissions to the role attached to the Lambda - nothing changed

@kojimba were you able to find solution for this ? I am stuck with the similar situation

@dtroiano is there any solution for this ? This happens only with transactional producer with IAM auth

@Shashicr nope, could not fix it unfortunately, only work around it. Here’s my question on stack overflow as well. apache kafka - AWS MSK Transactions Support - Stack Overflow

Since this is MSK / IAM-specific, you might also try asking in AWS re:Post.