We’re looking to start using Confluent Cloud’s Kafka to manage CDC from our database. We use Spanner which is not currently supported via Confluent Cloud but there is a connector that I have gotten working locally (local Kafka, Connect and Zookeeper via Docker compose).
I’ve tried connecting a local Docker image with Confluent Cloud’s Kafka by following this. Currently I’m having issues with time-outs and am trying to resolve those.
I can see that the Connect topics (i.e. demo-connect-configs/offsets/status
) are created in Confluent Cloud but when I try to use the connector I get the following error logs:
connect | [2024-02-03 01:24:36,871] INFO [AdminClient clientId=adminclient-2] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
connect | [2024-02-03 01:24:36,871] INFO [AdminClient clientId=adminclient-2] Cancelled in-flight API_VERSIONS request with correlation id 137 due to node -1 being disconnected (elapsed time since creation: 5032ms, elapsed time since send: 5032ms, request timeout: 3600000ms) (org.apache.kafka.clients.NetworkClient)
connect | [2024-02-03 01:24:37,690] INFO [AdminClient clientId=adminclient-2] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
connect | org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
I believe I am setting all of the environment variables correctly (at least following the instructions in the Github repo)
version: "3.7"
name: docker-connect
services:
connect:
image: spanner-connect
container_name: connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: XXX.gcp.confluent.cloud:9092
CONNECT_GROUP_ID: "connect-unique"
CONNECT_CONFIG_STORAGE_TOPIC: demo-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: demo-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: demo-connect-status
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://XXX.gcp.confluent.cloud
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: XXX:XXXX
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
# Connect worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
CONNECT_SASL_MECHANISM: PLAIN
# Connect producer
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
# Connect consumer
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
volumes:
- secretFile:secret_file.json
Is there anything that I have to do extra to connect to a Confluent Cloud Kafka broker with a custom connector? I’m trying to use the Debezium Spanner Connector