I have a Kafka Streams app and I’m implementing a production exception handler for messages that are too large for the destination topic. I noticed that if return FAIL from the handler and then restart my app, it will go into an infinite loop without ever calling the exception handler if the batch size > 1. The issue appears to be that in org.apache.kafka.clients.producer.internals.Sender#completeBatch (kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java at 3f8e86ab674e49784b2b21ea91a822728f8f577e · apache/kafka · GitHub), where it tries to split the batch if the message is too large and the batch size > 1. I assume this is in case one of the other messages in the batch is not too large and can be processed successfully. In my case, the batchSize is not reduced by splitBatch and it is re-enqueued with the same number of records. This results in an infinite loop and eventually results in a stack overflow.
I’m not sure why the batch isn’t split and I also don’t understand why my custom exception handler is never called.
I am not sure if KIP-1065 would address your issue (at least partially) or not, but it would be great if you could try out 4.0.0 and let us know?
A RecordTooLargeException is not retriable, and it seems the handler is called originally with this error, as you said you did return FAIL. – I am not 100% sure though, what happens after the restart…
it will go into an infinite loop without ever calling the exception handler if the batch size > 1
Not 100% sure what you mean by this.
I assume this is in case one of the other messages in the batch is not too large and can be processed successfully. In my case, the batchSize is not reduced by splitBatch and it is re-enqueued with the same number of records.
I am not a producer expert. Can it be that there is a single message that is larger than batch.size, and thus, it’s not possible to split this single message? Otherwise, it would sound like a bug in the producer to me.
Do you see the corresponding WARN log?
Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}
Or some other WARN log that can help to see what’s going on?
Yes, I’m seeing that warning in the logs. This is the section of code I’m referring to:
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) {
Errors error = response.error;
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() && (batch.magic() >= 2 || batch.isCompressed())) {
this.log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", new Object[]{correlationId, batch.topicPartition, this.retries - batch.attempts(), this.formatErrMsg(response)});
if (this.transactionManager != null) {
this.transactionManager.removeInFlightBatch(batch);
}
this.accumulator.splitAndReenqueue(batch);
this.maybeRemoveAndDeallocateBatch(batch);
this.sensors.recordBatchSplit();
}
If there is a consumer lag of 2 records when I start the app and I get a “record too large” exception, it falls into this code. If I follow the execution of splitAndReenqueue(), it doesn’t successfully split the batch. It just re-enqueues the same batch of 2 records and ends up at this code again without ever calling my producer exception handler.
I can try to test with the 4.0 version but as you mentioned this isn’t a retriable error.