Flink Kafka connector fetch metadata failed

I am new to flink and i am trying out one job to read data from kafka and write it to clickhouse. For which I am writing java program with kafka as a source. When compiling the code and running it using flink ui. I am getting below error.

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
    at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
    at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:140)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:324)
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:669)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460)
    at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225)
    at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
    at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: Kafka Source -> Sink: Unnamed' (operator cbc357ccb763df2852fee8c4fc7d55f2).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:259)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:432)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:445)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to 
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:248)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
    ... 6 more
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [mytopic].
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:233)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: describeTopics
    at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
    ... 9 more
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: describeTopics

below is the code piece i am using to connect flink with kafka

 KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("boostrap.server:32100")
                .setTopics(parameter.get("topics", "newtopic"))
                .setGroupId(parameter.get("group", "mygroup"))
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("ssl.truststore.location", "/opt/flink/kafkatruststore.jks")
                .setProperty("ssl.truststore.password", "changeit")
                .setProperty("ssl.keystore.location", "/opt/flink/kafkatruststore.jks")
                .setProperty("ssl.keystore.password", "changeit")
                .setProperty("sasl.mechanism", "SCRAM-SHA-256")
                .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
                .build();


        //read data from kafka
        DataStream<String> text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


My kafka and my flink is deployed in kubernetes cluster. I am using ui to trigger job.
I cannot find any satisfactory resolution anywhere. Can anyone help me debug this issue.

Note my kafka is ssl secured.

Hi there! Can you also share the code in which you are constructing an instance of a KafkaSink and pointing your application to clickhouse?