I’m working in a project that uses kafka for communication. There is a business communication associated with a payment, so I configured kafka-client with exactly-once semantic in that topic.
While testing my application, I notice that if kafka broker is down or the topic is deleted, a timeout is thrown, and the client fails to deliver the message, but when the kafka broker is up again,
the client doesn’t recover automatically, and begin a loop of error message. I can’t commit the transaction neither abort the transaction.
In previous version 2.8.2, the client reconnect automatically
I reproduce the error with this code:
public static void main(String[] args) throws Exception {
KafkaProducer<String, String> producer = createKafkaProducer();
producer.initTransactions();
String topicName = "topico-de-prueba";
int i = 0;
while(true) {
sendMsg(producer, topicName, "msj-"+i);
Thread.sleep(Duration.ofSeconds(1));
i++;
}
}
private static void sendMsg(KafkaProducer<String, String> producer, String topic, String msj ) {
try {
System.out.println("Begin transaction");
producer.beginTransaction();
System.out.println("Send message: "+msg);
producer.send(new ProducerRecord<String, String>(topic, null, msg));
System.out.println("Commit transaction");
producer.commitTransaction();
}catch(Exception e) {
System.out.println("ERROR1: can't send the message "+e.getClass().getName());
e.printStackTrace();
System.out.println(e.getMessage());
System.out.println("*********************************************************************************");
try {
producer.abortTransaction();
}catch(Exception e2) {
System.out.println("ERROR2: can't abort");
e2.printStackTrace();
}
}
}
The output:
Send message: msg-33
Commit transaction
ERROR1: can't send the message org.apache.kafka.common.errors.TimeoutException
Timeout expired after 60000ms while awaiting EndTxn(true)
*********************************************************************************
ERROR2: can't abort
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting EndTxn(true)
java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` because the previous call to `commitTransaction` timed out and must be retried
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1109)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
at coop.tecso.kafka.TransactionalMessageProducer.sendMsg(TransactionalMessageProducer.java:54)
at coop.tecso.kafka.TransactionalMessageProducer.main(TransactionalMessageProducer.java:33)
Begin transaction
ERROR1: can't send the message java.lang.IllegalStateException
Cannot attempt operation `beginTransaction` because the previous call to `commitTransaction` timed out and must be retried
*********************************************************************************
ERROR2: can't abort
java.lang.IllegalStateException: Cannot attempt operation `beginTransaction` because the previous call to `commitTransaction` timed out and must be retried
at org.apache.kafka.clients.producer.internals.TransactionManager.throwIfPendingState(TransactionManager.java:1091)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:252)
at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:658)
at coop.tecso.kafka.TransactionalMessageProducer.sendMsg(TransactionalMessageProducer.java:43)
at coop.tecso.kafka.TransactionalMessageProducer.main(TransactionalMessageProducer.java:33)
java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` because the previous call to `commitTransaction` timed out and must be retried
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1109)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
at coop.tecso.kafka.TransactionalMessageProducer.sendMsg(TransactionalMessageProducer.java:54)
at coop.tecso.kafka.TransactionalMessageProducer.main(TransactionalMessageProducer.java:33)
**¿What is the best strategy for handle this kind of error? **
¿How can I recover from timeout?