My transactional producer is randomly receiving the following exception while trying to commit a transaction, and I can’t figure out where the interrupt is coming from (certainly doesn’t seem to be from my code):
Feb 04 05:03:38.172 Unexpected transient exception, re-trying transaction
org.apache.kafka.common.errors.InterruptException: Received interrupt while awaiting EndTxn(true)
at org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:67) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:791) ~[?:?]
at net.company.kafka.producer.KafkaProducerImpl.commitTransaction(KafkaProducerImpl.kt:135) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1081) ~[?:?]
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1369) ~[?:?]
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:278) ~[?:?]
at org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:56) ~[?:?]
According to the docs, InterruptedException is retryable (just once), if the transaction fails again the producer must be closed and re-created from scratch. I’m receiving the above exception during the retry operation as well, so I happily proceed to close the producer and recreate it. The producer itself, however, is logging the following under ERROR
log level:
Feb 04 05:03:38.173 [Producer clientId=my-client-id-producer, transactionalId=my-transactional-id-producer-797984486] Interrupted while joining ioThread
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:?]
at java.lang.Thread.join(Thread.java:1308) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1313) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1290) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1266) ~[?:?]
at net.company.kafka.producer.KafkaProducerImpl.closeProducer(KafkaProducerImpl.kt:171) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Kafka broker logs for the actual producer don’t say much (at least I couldn’t find anything interesting):
Feb 04 05:03:06.504 [TransactionCoordinator id=0] Initialized transactionalId my-transactional-id-producer-797984486 with producerId 120 and producer epoch 0 on partition __transaction_state-16
Feb 04 05:03:39.497 [TransactionCoordinator id=0] Initialized transactionalId my-transactional-id-producer-797984486 with producerId 120 and producer epoch 1 on partition __transaction_state-16
Feb 04 05:03:40.707 [TransactionCoordinator id=0] Initialized transactionalId my-transactional-id-producer-797984486 with producerId 120 and producer epoch 2 on partition __transaction_state-16
Why is the original transaction being interrupted?, is there anything I can to prevent it?. Moreover, is it safe to ignore the message logged under ERROR
level?, if so, shouldn’t it be logged under WARN
?.