Connect Docker Kafka Connect to Confluent Cloud

We’re looking to start using Confluent Cloud’s Kafka to manage CDC from our database. We use Spanner which is not currently supported via Confluent Cloud but there is a connector that I have gotten working locally (local Kafka, Connect and Zookeeper via Docker compose).

I’ve tried connecting a local Docker image with Confluent Cloud’s Kafka by following this. Currently I’m having issues with time-outs and am trying to resolve those.

I can see that the Connect topics (i.e. demo-connect-configs/offsets/status) are created in Confluent Cloud but when I try to use the connector I get the following error logs:

connect  | [2024-02-03 01:24:36,871] INFO [AdminClient clientId=adminclient-2] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
connect  | [2024-02-03 01:24:36,871] INFO [AdminClient clientId=adminclient-2] Cancelled in-flight API_VERSIONS request with correlation id 137 due to node -1 being disconnected (elapsed time since creation: 5032ms, elapsed time since send: 5032ms, request timeout: 3600000ms) (org.apache.kafka.clients.NetworkClient)
connect  | [2024-02-03 01:24:37,690] INFO [AdminClient clientId=adminclient-2] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
connect  | org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata

I believe I am setting all of the environment variables correctly (at least following the instructions in the Github repo)

version: "3.7"
name: docker-connect
services:
  connect:
    image: spanner-connect
    container_name: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: XXX.gcp.confluent.cloud:9092
      CONNECT_GROUP_ID: "connect-unique"
      CONNECT_CONFIG_STORAGE_TOPIC: demo-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: demo-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: demo-connect-status
      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://XXX.gcp.confluent.cloud
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: XXX:XXXX
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
      # Connect worker
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
      CONNECT_SASL_MECHANISM: PLAIN
      # Connect producer
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
      # Connect consumer
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX";
      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
    volumes:
        - secretFile:secret_file.json

Is there anything that I have to do extra to connect to a Confluent Cloud Kafka broker with a custom connector? I’m trying to use the Debezium Spanner Connector

Are you using private networking or secure public endpoints?

Try using nc to make sure that the cluster is reachable. From inside the Docker container:

  1. Ensure connectivity to bootstrap servers endpoint
nc -vz pkc-12345.us-east-2.aws.confluent.cloud 9092
  1. Get list of brokers with kcat
kcat -L -b pkc-12345.us-east-2.aws.confluent.cloud:9092 \
    -X security.protocol=sasl_ssl \
     -X sasl.mechanisms=PLAIN \
     -X sasl.username=<API KEY> \
     -X sasl.password=<SECRET>

This will output like:

 6 brokers:
  broker 0 at b0-pkc-12345.us-east-2.aws.confluent.cloud:9092 (controller)
  broker 1 at b1-pkc-12345.us-east-2.aws.confluent.cloud:9092
  broker 2 at b2-pkc-12345.us-east-2.aws.confluent.cloud:9092
  broker 3 at b3-pkc-12345.us-east-2.aws.confluent.cloud:9092
  broker 4 at b4-pkc-12345.us-east-2.aws.confluent.cloud:9092
  broker 5 at b5-pkc-12345.us-east-2.aws.confluent.cloud:9092
  1. Ensure connectivity to individual brokers (substitute the number of brokers in the for loop):
for b in {0..5}; do nc -vz b${b}-pkc-12345.us-east-2.aws.confluent.cloud 9092; done

This should output success:

Connection to b0-pkc-12345.us-east-2.aws.confluent.cloud port 9092 [tcp/XmlIpcRegSvc] succeeded!
Connection to b1-pkc-12345.us-east-2.aws.confluent.cloud port 9092 [tcp/XmlIpcRegSvc] succeeded!
Connection to b2-pkc-12345.us-east-2.aws.confluent.cloud port 9092 [tcp/XmlIpcRegSvc] succeeded!
Connection to b3-pkc-12345.us-east-2.aws.confluent.cloud port 9092 [tcp/XmlIpcRegSvc] succeeded!
Connection to b4-pkc-12345.us-east-2.aws.confluent.cloud port 9092 [tcp/XmlIpcRegSvc] succeeded!
Connection to b5-pkc-12345.us-east-2.aws.confluent.cloud port 9092 [tcp/XmlIpcRegSvc] succeeded!

I am using the secure public endpoint

The output is slightly different but I think it’s the same result:

[root@e9ddc982c46c kafkacat]# for b in {0…11}; do nc -vz b${b}-pkc-1234.us-central1.gcp.confluent.cloud 9092; done
Ncat: Version 7.92
Ncat: Connected to 34.28.16.239:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.07 seconds.
Ncat: Version 7.92
Ncat: Connected to 35.225.23.60:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.05 seconds.
Ncat: Version 7.92
Ncat: Connected to 34.71.102.98:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.06 seconds.
Ncat: Version 7.92
Ncat: Connected to 34.134.14.84:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.05 seconds.
Ncat: Version 7.92
Ncat: Connected to 34.173.126.255:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.06 seconds.
Ncat: Version 7.92
Ncat: Connected to 34.122.15.39:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.05 seconds.
Ncat: Version 7.92
Ncat: Connected to 34.68.135.8:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.05 seconds.
Ncat: Version 7.92
Ncat: Connected to 34.29.143.151:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.05 seconds.
Ncat: Version 7.92
Ncat: Connected to 34.136.2.196:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.06 seconds.
Ncat: Version 7.92
Ncat: Connected to 104.197.138.20:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.05 seconds.
Ncat: Version 7.92
Ncat: Connected to 104.155.165.107:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.06 seconds.
Ncat: Version 7.92
Ncat: Connected to 35.184.131.222:9092.
Ncat: 0 bytes sent, 0 bytes received in 0.06 seconds.

Nothing is jumping out at me from the worker config. Could you share your connector config?

Sure it’s

{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "$GCP_PROJECT",
    "gcp.spanner.instance.id": "$SPANNER_INSTANCE,
    "gcp.spanner.database.id": "$SPANNER_DATABASE",
    "gcp.spanner.change.stream": "$CHANGE_STREAM",
    "gcp.spanner.credentials.path": "/opt/gcp-creds.json",
    "tasks.max": "1"
}

It’s directly taken from Build change streams connections to Kafka  |  Spanner  |  Google Cloud

I’m reaching, but the error is coming from the Kafka admin client (instantiated here, configured here). Based on the config code, it’ll use any properties prefixed with kafka.internal.client. (and remove the prefix), so try adding the following connector properties:

    "kafka.internal.client.security.protocol": "SASL_SSL",
    "kafka.internal.client.sasl.mechanism": "PLAIN",
    "kafka.internal.client.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<API KEY>\" password=\"<SECRET>\";",

If that doesn’t help, you might try filing a DBZ JIRA in case there’s a connector-specific issue we are missing.

That did it! Thank you, never would have found that.

I also had to set "connector.spanner.sync.max.message.bytes": 8388608 because the default value was larger than what was allowed but setting that and the above settings got it all working.

1 Like