Error - Kafka Connect embedded inside KSQL connecting to Confluent Cloud Cluster

I am hosting ksqldb server locally and connecting to Kafka cluster hosted in Confluent Cloud. Without embedding Connect inside ksqld server, I am able to do this successfully.

However as soon as I embed Connect inside the ksqldb server, I run into errors. Below are the errors I see. (It also works if I connect to zookeeper and broker running locally)

ERROR Unhandled exception in server startup (io.confluent.ksql.rest.server.KsqlServerMain:96)
ksqldb-server | io.confluent.ksql.util.KsqlStatementException: Failed to check if exists for topic: _confluent-ksql-default__command_topic
ksqldb-server | Statement: CREATE STREAM KSQL_COMMANDS (STATEMENT STRING) WITH(KEY_FORMAT=‘KAFKA’, VALUE_FORMAT=‘JSON’, KAFKA_TOPIC=‘_confluent-ksql-default__command_topic’);

Caused by: io.confluent.ksql.exception.KafkaResponseGetFailedException: Failed to check if exists for topic: _confluent-ksql-default__command_topic.

I have verified that both the above topics exist in the Confluent cluster I am connecting to.

Below is the docker compose file I am using

---
    version: '2'
    
    services:
      ksqldb-server:
        image: confluentinc/ksqldb-server:0.19.0
        hostname: ksqldb-server
        container_name: ksqldb-server
        ports:
          - "8089:8089"
        volumes:
          - "./confluent-hub-components/:/usr/share/kafka/plugins/"
        environment:
          KSQL_LISTENERS: "http://0.0.0.0:8089"
          KSQL_AUTO_OFFSET_RESET: "earliest"
          KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
          KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
          # Confluent Cloud 
          KSQL_BOOTSTRAP_SERVERS: "**********:9092"
          KSQL_SECURITY_PROTOCOL: SASL_SSL
          KSQL_SASL_MECHANISM: "PLAIN"
          KSQL_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******";
          KSQL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
          # Confluent Schema Registry
          KSQL_KSQL_SCHEMA_REGISTRY_URL: "https://*******.confluent.cloud"
          KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
          KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: *******:*******
          # Configuration to embed Kafka Connect support
          KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
          KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
          KSQL_CONNECT_BOOTSTRAP_SERVERS: "*******.confluent.cloud:9092"
          KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
          KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
          KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://*******.confluent.cloud"
          KSQL_CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
          KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: *******:*******
          KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-configs"
          KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-offsets"
          KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-statuses"
          KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
          KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
          KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
          KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
          # Connect worker
          KSQL_CONNECT_SECURITY_PROTOCOL: SASL_SSL
          KSQL_CONNECT_SASL_MECHANISM: "PLAIN"
          KSQL_CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******";
          KSQL_CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"

      ksqldb-cli:
        image: confluentinc/ksqldb-cli:0.19.0
        container_name: ksqldb-cli
        depends_on:
          - ksqldb-server
        entrypoint: /bin/sh
        tty: true

With the below changes, I was able to get past the above error. (previous values were 1)

KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3

The error I am seeing now is. (not sure where to look in the docker compose file)

ERROR Unhandled exception in server startup (io.confluent.ksql.rest.server.KsqlServerMain:96)
ksqldb-server | io.confluent.ksql.util.KsqlException: Failed to start API server
ksqldb-server | at io.confluent.ksql.api.server.Server.start(Server.java:166)
ksqldb-server | at io.confluent.ksql.rest.server.KsqlRestApplication.startAsync(KsqlRestApplication.java:365)
ksqldb-server | at io.confluent.ksql.rest.server.MultiExecutable.doAction(MultiExecutable.java:68)
ksqldb-server | at io.confluent.ksql.rest.server.MultiExecutable.startAsync(MultiExecutable.java:42)
ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerMain.tryStartApp(KsqlServerMain.java:92)
ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:67)
ksqldb-server | Caused by: java.lang.InterruptedException
ksqldb-server | at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385)
ksqldb-server | at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
ksqldb-server | at io.confluent.ksql.api.server.Server.start(Server.java:161)

This topic was automatically closed after 30 days. New replies are no longer allowed.