Kafka Consumer with pattern subscriptions fails on topic deletion

Let me describe my use case first:
I have an application which maintains a pool of empty target kafka topics (lets say tktA1, tktA2, …, tktAN). It also consumes another source topic (skt) and after some filtering and transformation routes messages to target topics. Which target topic is chosen depends on business logic. But messages of one specific type always gets to one specific topic. And if I get a new message type one of the topics from pool is taken and gets assigned to this type.
So what happens is that Kafka cluster contains a lot of topics with similar names. Some of them have non zero High Water Mark, some haven’t been used yet. When I redeploy the application not used topics from the previous pool are deleted and a new pool of kafka topics is created (tktB1, tktB2, …, tktBN).
On the other hand I have Kafka Connect cluster and sink connector. It consumes topics by a pattern matching both topics from an old and a new pool (for example “tkt.*”).

Right after redeploying application some sink connector tasks fail with TimeoutException and I have to manually restart them. I’ve tracked the exception down to the following method in KafkaConsumer:

    public long position(TopicPartition partition, final Duration timeout) {
        acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(partition))
                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");

            Timer timer = time.timer(timeout);
            do {
                SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition);
                if (position != null)
                    return position.offset;

                updateFetchPositions(timer);
                client.poll(timer);
            } while (timer.notExpired());

            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
                    "for partition " + partition + " could be determined");
        } finally {
            release();
        }
    }

If I read it correctly it’ll work only if “partition” of deleted topic has “position”. Which is unlikely assuming the topic haven’t been used yet. Probably I can mitigate this one by setting metadata.max.age.ms to something very low. But it doesn’t guarantee anything.

Could you help me answering the following questions?

  • Does consumer subscription by pattern supports topic deletion? As I see just added topics are picked up correctly.
  • Is it a bug or an expected behaviour?
  • Do you have any ideas how to bypass this exception? Maybe besides metadata.max.age.ms I need to fine tune some timeouts …