Kafka Producer time out during initTransactions while one broker is down

I’m using Kafka 2.7.0 and I have created kafka producer with Kafka Transaction. But it hangs and timeout while initialializing the Kafka transaction. In my Kafka cluster, I have 3 brokers.

public class KafkaTransactionRecoveryExample {

    public static void main(String[] args) throws InterruptedException {
        // Kafka producer properties
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "MyBroker1:9092");
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", StringSerializer.class.getName());
        producerProps.put("acks", "all");
        producerProps.put("enable.idempotence", "true");
        producerProps.put("transactional.id", "transaction");
        
        producerProps.put("request.timeout.ms",  12000);
        producerProps.put("transaction.timeout.ms",  12000);
        producerProps.put("max.block.ms",  12000); 
        producerProps.put("batch.size", 60000);
        producerProps.put("retries", 3); // Number of retries before giving up
        producerProps.put("delivery.timeout.ms", 30000); // Increase delivery timeout to 30 seconds
        producerProps.put("request.timeout.ms",  30000); // Increase request timeout to 30 seconds   
        producerProps.put("transaction.state.log.replication.factor",  1);
        producerProps.put("transaction.state.log.min.isr",  1); 
        
        KafkaProducer<String, String> producer1 = null;
        try {
        producer1 =  new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer());
        
        // Initialize producer's transaction
        **producer1.initTransactions();**
        System.out.println("initTransactions");

        .....
        }
    }
}

Output (While MyBroker1 is down):

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Overriding the default enable.idempotence to true since transactional.id is specified.
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Instantiated a transactional producer.
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Overriding the default acks to all since idempotence is enabled.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1695122942876
[main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Invoking InitProducerId for the first time in order to acquire a producer ID
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Cluster ID: m4HYe1sbTX6I9IScDjS04A

[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)

......

[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)

**Exception 2: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 1000milliseconds while awaiting InitProducerId**

**Broker appears to be down. Waiting and retrying...**

[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)

It seems Kafka transaction is not initialised and I have tried adding different properties and still not able to resolve the issue. Also this error occured while using kafka transactions only.

producerProps.put(“transaction.state.log.replication.factor”, 1);

You should set this to 3. Otherwise, if one broker is down, the transaction coordinator cannot fail over, and thus if one broker is down that you observe can happen.

producerProps.put(“transaction.state.log.min.isr”, 1);

You should also set this to 2, otherwise correctness cannot be guaranteed.

producerProps.put(“retries”, 3);

Last, you should this to MAX_VALUE, to get infinite retries.

Hi,
Thanks for the reply.
I tried it today commenting all the time outs which I sen through the producer.
But I’m still getting an timeout.

producerProps.put("acks", "all");
        producerProps.put("enable.idempotence", "true");
        producerProps.put("transactional.id", "transaction");
        
       // producerProps.put("request.timeout.ms",  30000);
       // producerProps.put("transaction.timeout.ms",  30000);
       // producerProps.put("max.block.ms",  30000); 
       // producerProps.put("batch.size", 60000);
        producerProps.put("retries", Integer.MAX_VALUE); // Number of retries before giving up
       // producerProps.put("delivery.timeout.ms", 30000); // Increase delivery timeout to 30 seconds
       // producerProps.put("request.timeout.ms",  30000); // Increase request timeout to 30 seconds   
        producerProps.put("transaction.state.log.replication.factor",  3);
        producerProps.put("transaction.state.log.min.isr",  2);

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
Broker appears to be down. Waiting and retrying

BR,
Chathurika

Well, actually transaction.state.log.replication.factor and transaction.state.log.min.isr are broker side configs, not producer configs. Can you verify that this topic has the correct configuration?