Bug? TimeoutException is thrown as IllegalStateException causing client shutdown

TimeoutException is thrown as IllegalStateException in org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded. Which causes the client to shutdown in org.apache.kafka.streams.KafkaStreams#getActionForThrowable.

Could this be a bug exposed by change KAFKA-12887 which was introduced in kafka-streams ver 3.1.0

Timeout should be a recoverable error which is expected to be handled by User.

org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded

    public boolean commitNeeded() {
        if (commitNeeded) {
            return true;
        } else {
            for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
                final TopicPartition partition = entry.getKey();
                try {
                    final long offset = mainConsumer.position(partition);
                    if (offset > entry.getValue() + 1) {
                        commitNeeded = true;
                        entry.setValue(offset - 1);
                    }
                } catch (final TimeoutException error) {
                    // the `consumer.position()` call should never block, because we know that we did process data
                    // for the requested partition and thus the consumer should have a valid local position
                    // that it can return immediately

                    // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
                    throw new IllegalStateException(error);
                } catch (final KafkaException fatal) {
                    throw new StreamsException(fatal);
                }
            }

            return commitNeeded;
        }
    }

org.apache.kafka.streams.KafkaStreams#getActionForThrowable

private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
                                                                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
        if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
            action = SHUTDOWN_CLIENT;
        } else {
            action = streamsUncaughtExceptionHandler.handle(throwable);
        }
        return action;
    }

    private void handleStreamsUncaughtException(final Throwable throwable,
                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
                                                final boolean skipThreadReplacement) {
        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
        if (oldHandler) {
            log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
                    "The old handler will be ignored as long as a new handler is set.");
        }
        switch (action) {
            case REPLACE_THREAD:
                if (!skipThreadReplacement) {
                    log.error("Replacing thread in the streams uncaught exception handler", throwable);
                    replaceStreamThread(throwable);
                } else {
                    log.debug("Skipping thread replacement for recoverable error");
                }
                break;
            case SHUTDOWN_CLIENT:
                log.error("Encountered the following exception during processing " +
                        "and Kafka Streams opted to " + action + "." +
                        " The streams client is going to shut down now. ", throwable);
                closeToError();
                break;

As the comment in the code indicates, we should never hit a TimeoutException here. Can you reproduce the issue? Would be great if you could file a bug report with more details. Thanks!

thanks mjsax for looking at this.

How can I log a bug please?

The assumption that TimeoutException will never be thrown for that piece of code may not to be correct.

Here’s the stacktrace seen for this issue on kafka-streams v. 3.1.0:

2022-06-22 13:58:35,796 ERROR thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
        at org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185)
        at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111)
        at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
        ... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
2022-06-22 13:58:35,796  INFO thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] State transition from RUNNING to PENDING_ERROR

You can file a ticket in the Apache Kafka Jira board: https://issues.apache.org/jira/browse/KAFKA

Thanks for sharing the stack trace (please add it to the ticket). Can you reproduce the issue? If yes, DEBUG logs would be super help to dig into it.

The assumption that TimeoutException will never be thrown for that piece of code may not to be correct.

I believe that he assumption itself is still correct. The question is, why is the assumption violated? Maybe there is some bug in tracking metadata and we incorrectly try to commit for a partition we don’t own any longer… :thinking: – As mentioned above, DEBUG level logs would be helpful to figure it out.

Ticket is raised: [KAFKA-14054] Unexpected client shutdown as TimeoutException is thrown as IllegalStateException - ASF JIRA

Unfortunately, we’re unable to reproduce this and that’s the only log we can find.

I believe the issue is definitely something worth looking into, especially as it forces the client to shutdown. We have seen it a few times lately.

Thanks again @mjsax for your quick responses to this. Much appreciated :pray:

1 Like

hi,
is it ok to assume that “mainConsumer.position(partition)” should not do any I/O? After all, this is an API call. And why is it important to treat it as illegal exception (programmer error in lib)? Would it not be harmless to handle it as timeout, and recover from it automatically (recreate thread)? What is the advantage of failing the client (instead of recovering the thread)?

thanks for reply
Michal

The consumer maintains its position locally, and it would only call the broker to fetch the last committed offset to init its position, if it has no valid local position. The assumption was, that the consumer should always have a valid position at this point and never make an RPC to the broker and thus can never hit a TimeoutException. – If we would hit a TimeoutException, something would be incorrect, and instead of masking the error, we want to surface the error to fix the actual root cause.

However, it turns out that the assumption was incorrect, and it’s not guaranteed that the consumer has a valid local position, and making an RPC to the broker may happen. The right fix is, to just log and swallow the exception and move on.

hi,
good to hear that there will be a fix.
I saw PR. should it not be done the same in this method (it has similar code):
private Long findOffset(final TopicPartition partition) {

thanks
Michal

Might be best to comment on the PR?