Error - Linking Connect running locally to Confluent Kafka Cluster

I am running Kafka Connect locally and connecting to a Confluent Kafka Cluster.

My scenario is as follows

  1. docker compose file that has image for MySQL and Kafka Connect
  2. debezium connector to read from tables and write to topic in Kafka Cluster

When I run the setup

  1. The history topic and other connect related topics are created in Confluent Cloud
  2. The table topics do not get created and I see the below error

connect | [2021-08-04 00:49:45,655] WARN [Producer clientId=connector-producer-calls_connector-0] Error while fetching metadata with correlation id 13009 : {call-center-db.call-center.calls=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

Below is my docker compose file. (connect fails to create the table topics)

---
    version: '2'
    
    services:
      mysql:
        image: mysql:8.0.19
        hostname: mysql
        container_name: mysql
        ports:
          - "3306:3306"
        environment:
          MYSQL_ROOT_PASSWORD: mysql-pw
          MYSQL_DATABASE: call-center
          MYSQL_USER: example-user
          MYSQL_PASSWORD: example-pw
        volumes:
          - "./mysql/custom-config.cnf:/etc/mysql/conf.d/custom-config.cnf"

      connect:
        image: ${REPOSITORY}/cp-kafka-connect:${CONNECT_DOCKER_TAG}
        container_name: connect
        ports:
          - "8083:8083"
        volumes:
          - "./confluent-hub-components/:/usr/share/kafka/plugins/"
        environment:
          CONNECT_BOOTSTRAP_SERVERS: "***.cloud:9092"
          CONNECT_REST_ADVERTISED_HOST_NAME: connect
          CONNECT_REST_PORT: 8083
          CONNECT_GROUP_ID: compose-connect-group
          CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
          CONNECT_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_SASL_MECHANISM: "PLAIN"
          CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***";
          CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-configs"
          CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-offsets"
          CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-statuses"
          CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_CLEANUP_POLICY: "COMPACT"
          CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
          CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
          CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
          CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
          CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***.confluent.cloud"
          CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
          CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: ***:***
          CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins/"
          CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
          # CLASSPATH required due to CC-2422
          CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.1.jar

          #Connect Producer
          CONNECT_PRODUCER_BOOTSTRAP_SERVERS: "***.confluent.cloud:9092"
          CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
          CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***";
          CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
          CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"

          #Connect Consumer
          CONNECT_CONSUMER_BOOTSTRAP_SERVERS: "***.confluent.cloud:9092"
          CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
          CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
          CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***";
          CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
          CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
          CONNECT_CONSUMER_AUTO_OFFSET_RESET: "earliest"

And the debezium connector is as follows. (which successfully creates the history topic)

curl -i -X POST -H "Accept:application/json" -H "Content-Type: application/json"  \
	http://localhost:8083/connectors \
	-d '{
			"name": "calls_connector",
			"config": {
				"connector.class": "io.debezium.connector.mysql.MySqlConnector",
   				"database.hostname": "mysql",
    			"database.port": "3306",
    			"database.user": "example-user",
    			"database.password": "example-pw",
			    "database.allowPublicKeyRetrieval": "true",
			    "database.server.id": "184054",
			    "database.server.name": "call-center-db",
			    "database.whitelist": "call-center",

			    "database.history.kafka.bootstrap.servers": "***.confluent.cloud:9092",
                "database.history.consumer.security.protocol": "SASL_SSL",
			    "database.history.consumer.ssl.endpoint.identification.algorithm": "https",
			    "database.history.consumer.sasl.mechanism": "PLAIN",
			    "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"****\" password=\"***\";",
			    "database.history.producer.security.protocol": "SASL_SSL",
			    "database.history.producer.ssl.endpoint.identification.algorithm": "https",
			    "database.history.producer.sasl.mechanism": "PLAIN",
			    "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"***\";",
			    
			    "database.history.kafka.topic": "call-center",
			    "table.whitelist": "call-center.calls",
			    "include.schema.changes": "false"
			}
	}' 

Even adding the below did not help to create the topic

CONNECT_TOPIC_CREATION_ENABLE: "true"
CONNECT_TOPIC_CREATION_DEFAULT_REPLICATION_FACTOR: 3
CONNECT_TOPIC_CREATION_DEFAULT_PARTITIONS: 3

My understanding is - Kafka Clusters do allow Connect (and hence the Connect producers) to create topics on the brokers in the cluster.

Is there anything I am missing?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.