Configuring self-managed Confluent connectors with Confluent Cloud

If you’re using Confluent connectors that are licensed and thus need a licence topic there are additional config steps required. If you don’t, you’ll get this kind of License topic could not be created error:

ERROR WorkerConnector{id=my-connector} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
org.apache.kafka.common.errors.TimeoutException: License topic could not be created
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1611233799702, tries=1, nextAllowedTryMs=1611233799803) timed out at 1611233799703 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
ERROR [Worker clientId=connect-1, groupId=my-connect] Failed to start connector 'my-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: my-connector (edited) 

Regardless of your kafka broker, you’ll always need to configure this in your connector:

"confluent.topic.bootstrap.servers"
"confluent.topic.replication.factor"

But for secured clusters (like Confluent Cloud) you also need to configure the security details:

"confluent.topic.security.protocol"
"confluent.topic.sasl.jaas.config"
"confluent.topic.sasl.mechanism"

So the config to add to your connector (not worker) would look something like this:

"confluent.topic.bootstrap.servers" : "CCLOUD_BOOTSTRAP_URL"
"confluent.topic.replication.factor": 3
"confluent.topic.security.protocol" : "SASL_SSL",
"confluent.topic.sasl.jaas.config"  : "org.apache.kafka.common.security.plain.PlainLoginModule required username='CCLOUD_USER' password='CCLOUD_SECRET';",
"confluent.topic.sasl.mechanism"    : "PLAIN",

For general info on setting up a self-managed connector with Confluent Cloud see this blog and the docs.

1 Like