I am hosting ksqldb server locally and connecting to Kafka cluster hosted in Confluent Cloud. Without embedding Connect inside ksqld server, I am able to do this successfully.
However as soon as I embed Connect inside the ksqldb server, I run into errors. Below are the errors I see. (It also works if I connect to zookeeper and broker running locally)
ERROR Unhandled exception in server startup (io.confluent.ksql.rest.server.KsqlServerMain:96)
ksqldb-server | io.confluent.ksql.util.KsqlStatementException: Failed to check if exists for topic: _confluent-ksql-default__command_topic
ksqldb-server | Statement: CREATE STREAM KSQL_COMMANDS
(STATEMENT STRING) WITH(KEY_FORMAT=‘KAFKA’, VALUE_FORMAT=‘JSON’, KAFKA_TOPIC=‘_confluent-ksql-default__command_topic’);
Caused by: io.confluent.ksql.exception.KafkaResponseGetFailedException: Failed to check if exists for topic: _confluent-ksql-default__command_topic.
I have verified that both the above topics exist in the Confluent cluster I am connecting to.
Below is the docker compose file I am using
---
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.19.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8089:8089"
volumes:
- "./confluent-hub-components/:/usr/share/kafka/plugins/"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8089"
KSQL_AUTO_OFFSET_RESET: "earliest"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
# Confluent Cloud
KSQL_BOOTSTRAP_SERVERS: "**********:9092"
KSQL_SECURITY_PROTOCOL: SASL_SSL
KSQL_SASL_MECHANISM: "PLAIN"
KSQL_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******";
KSQL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
# Confluent Schema Registry
KSQL_KSQL_SCHEMA_REGISTRY_URL: "https://*******.confluent.cloud"
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: *******:*******
# Configuration to embed Kafka Connect support
KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "*******.confluent.cloud:9092"
KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://*******.confluent.cloud"
KSQL_CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: *******:*******
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
# Connect worker
KSQL_CONNECT_SECURITY_PROTOCOL: SASL_SSL
KSQL_CONNECT_SASL_MECHANISM: "PLAIN"
KSQL_CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******";
KSQL_CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.19.0
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true