I’m using Kafka 2.7.0 and I have created kafka producer with Kafka Transaction. But it hangs and timeout while initialializing the Kafka transaction. In my Kafka cluster, I have 3 brokers.
public class KafkaTransactionRecoveryExample {
public static void main(String[] args) throws InterruptedException {
// Kafka producer properties
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "MyBroker1:9092");
producerProps.put("key.serializer", StringSerializer.class.getName());
producerProps.put("value.serializer", StringSerializer.class.getName());
producerProps.put("acks", "all");
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transaction");
producerProps.put("request.timeout.ms", 12000);
producerProps.put("transaction.timeout.ms", 12000);
producerProps.put("max.block.ms", 12000);
producerProps.put("batch.size", 60000);
producerProps.put("retries", 3); // Number of retries before giving up
producerProps.put("delivery.timeout.ms", 30000); // Increase delivery timeout to 30 seconds
producerProps.put("request.timeout.ms", 30000); // Increase request timeout to 30 seconds
producerProps.put("transaction.state.log.replication.factor", 1);
producerProps.put("transaction.state.log.min.isr", 1);
KafkaProducer<String, String> producer1 = null;
try {
producer1 = new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer());
// Initialize producer's transaction
**producer1.initTransactions();**
System.out.println("initTransactions");
.....
}
}
}
Output (While MyBroker1 is down):
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Overriding the default enable.idempotence to true since transactional.id is specified.
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Instantiated a transactional producer.
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Overriding the default acks to all since idempotence is enabled.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1695122942876
[main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Invoking InitProducerId for the first time in order to acquire a producer ID
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Cluster ID: m4HYe1sbTX6I9IScDjS04A
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
......
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
**Exception 2: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 1000milliseconds while awaiting InitProducerId**
**Broker appears to be down. Waiting and retrying...**
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
It seems Kafka transaction is not initialised and I have tried adding different properties and still not able to resolve the issue. Also this error occured while using kafka transactions only.