I am running Kafka Connect locally and connecting to a Confluent Kafka Cluster.
My scenario is as follows
- docker compose file that has image for MySQL and Kafka Connect
- debezium connector to read from tables and write to topic in Kafka Cluster
When I run the setup
- The history topic and other connect related topics are created in Confluent Cloud
- The table topics do not get created and I see the below error
connect | [2021-08-04 00:49:45,655] WARN [Producer clientId=connector-producer-calls_connector-0] Error while fetching metadata with correlation id 13009 : {call-center-db.call-center.calls=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
Below is my docker compose file. (connect fails to create the table topics)
---
version: '2'
services:
mysql:
image: mysql:8.0.19
hostname: mysql
container_name: mysql
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: mysql-pw
MYSQL_DATABASE: call-center
MYSQL_USER: example-user
MYSQL_PASSWORD: example-pw
volumes:
- "./mysql/custom-config.cnf:/etc/mysql/conf.d/custom-config.cnf"
connect:
image: ${REPOSITORY}/cp-kafka-connect:${CONNECT_DOCKER_TAG}
container_name: connect
ports:
- "8083:8083"
volumes:
- "./confluent-hub-components/:/usr/share/kafka/plugins/"
environment:
CONNECT_BOOTSTRAP_SERVERS: "***.cloud:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***";
CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-statuses"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_CLEANUP_POLICY: "COMPACT"
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***.confluent.cloud"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: ***:***
CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins/"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.1.jar
#Connect Producer
CONNECT_PRODUCER_BOOTSTRAP_SERVERS: "***.confluent.cloud:9092"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***";
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
#Connect Consumer
CONNECT_CONSUMER_BOOTSTRAP_SERVERS: "***.confluent.cloud:9092"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***";
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_CONSUMER_AUTO_OFFSET_RESET: "earliest"
And the debezium connector is as follows. (which successfully creates the history topic)
curl -i -X POST -H "Accept:application/json" -H "Content-Type: application/json" \
http://localhost:8083/connectors \
-d '{
"name": "calls_connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "example-user",
"database.password": "example-pw",
"database.allowPublicKeyRetrieval": "true",
"database.server.id": "184054",
"database.server.name": "call-center-db",
"database.whitelist": "call-center",
"database.history.kafka.bootstrap.servers": "***.confluent.cloud:9092",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm": "https",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"****\" password=\"***\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.ssl.endpoint.identification.algorithm": "https",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"***\";",
"database.history.kafka.topic": "call-center",
"table.whitelist": "call-center.calls",
"include.schema.changes": "false"
}
}'