Kafka Connect externalize secrets AWS Secrets Manager

Hi,

I am trying to run a local kafka connect container for Snowpipe Streaming (Snowflake Sink) and below is the definition/config. I am also trying to externalize the secrets and trying to use the AWS secrets Manager to fetch the configuration/credentials.

kafka-connect-integration/Dockerfile

FROM confluentinc/cp-kafka-connect:7.5.1

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components/"

RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:2.0.1
RUN confluent-hub install --no-prompt confluentinc/csid-secrets-provider-aws:1.0.8

Connect Container

kafka-connect:
      build: kafka-connect-integration
      container_name: connect
      hostname: connect
      ports:
        - 8083:8083
      depends_on:
        - broker
      environment:
        CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
        CONNECT_GROUP_ID: "snow-streaming-grp1"
        CONNECT_CONFIG_STORAGE_TOPIC: "_snow-streaming-grp1-configs"
        CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_OFFSET_STORAGE_TOPIC: "_snow-streaming-grp1-offsets"
        CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_STATUS_STORAGE_TOPIC: "_snow-streaming-grp1-status"
        CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_REST_ADVERTISED_HOST_NAME: "snow-streaming"
        CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
        CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
        CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
        CONNECT_CONFIG_PROVIDERS": "secretmanager"
        CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_CLASS": "io.confluent.csid.config.provider.aws.SecretsManagerConfigProvider"
        CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_PARAM_AWS_REGION": "us-west-2"
        CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_PARAM_AWS_ACCESS_KEY: "XXXXXXXXXXXXX"
        CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_PARAM_AWS_SECRET_KEY: "XXXXXXXXXXXXX"

Connector setup:

{
    "name":"dna-snowflake-sink-edw_uat-public",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"1",
      "topics":"snowflake-stream",
      "snowflake.topic2table.map": "topic1:table1",
      "buffer.count.records":"1",
      "buffer.flush.time":"1",
      "buffer.size.bytes":"5000000",
      "config.providers": "secretmanager",
      "config.providers.secretmanager.class": "io.confluent.csid.config.provider.aws.SecretsManagerConfigProvider",
      "config.providers.secretmanager.param.aws.region": "us-west-2",
      "config.providers.secretsManager.param.aws.access.key": "XXXXXXXXXX",
      "config.providers.secretsManager.param.aws.secret.key": "XXXXXXXXXX",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING", 
      "snowflake.role.name": "SNOWFLAKE_ROLE",
      "snowflake.url.name":"<SNOWFLAKE_ACCOUNT>.snowflakecomputing.com:443",
      "snowflake.user.name": "${secretmanager:aws_sectret_name:user}",
      "snowflake.private.key":"<PRIVATE_KEY>",
      "snowflake.database.name":"<DATABASE_NAME>",
      "snowflake.schema.name":"DATABASE_SCHEMA",
      "errors.log.enable": true,
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": false,
      "value.converter.schemas.enable": false
    }
  }

When I POST the connector config to the rest API it seems that I am unable to get the USERNAME from the secrets manager.

The error that I get:

{
    "error_code": 400,
    "message": "Connector configuration is invalid and contains the following 3 error(s):\nsnowflake.url.name: Cannot connect to Snowflake\nsnowflake.user.name: Cannot connect to Snowflake\nsnowflake.private.key: Cannot connect to Snowflake\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

LOG:

2023-10-27 09:38:56 [2023-10-27 14:38:56,114] ERROR [SF_KAFKA_CONNECTOR] Validate: Error connecting to snowflake:[SF_KAFKA_CONNECTOR] Exception: Failed to connect to Snowflake Server
2023-10-27 09:38:56 Error Code: 1001
2023-10-27 09:38:56 Detail: Snowflake connection issue, reported by Snowflake JDBC
2023-10-27 09:38:56 Message: JWT token is invalid.
2023-10-27 09:38:56 net.snowflake.client.core.SessionUtil.newSession(SessionUtil.java:695)
2023-10-27 09:38:56 net.snowflake.client.core.SessionUtil.openSession(SessionUtil.java:291)
2023-10-27 09:38:56 net.snowflake.client.core.SFSession.open(SFSession.java:477)
2023-10-27 09:38:56 net.snowflake.client.jdbc.DefaultSFConnectionHandler.initialize(DefaultSFConnectionHandler.java:104)
2023-10-27 09:38:56 net.snowflake.client.jdbc.DefaultSFConnectionHandler.initializeConnection(DefaultSFConnectionHandler.java:79)
2023-10-27 09:38:56 net.snowflake.client.jdbc.SnowflakeConnectionV1.initConnectionWithImpl(SnowflakeConnectionV1.java:116)
2023-10-27 09:38:56 net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:96)
2023-10-27 09:38:56 net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:180)
2023-10-27 09:38:56 com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.<init>(SnowflakeConnectionServiceV1.java:82)
2023-10-27 09:38:56 com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.build(SnowflakeConnectionServiceFactory.java:83)
2023-10-27 09:38:56 com.snowflake.kafka.connector.SnowflakeSinkConnector.validate(SnowflakeSinkConnector.java:223)
2023-10-27 09:38:56 org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:509)
2023-10-27 09:38:56 org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:390)
2023-10-27 09:38:56 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-10-27 09:38:56 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-10-27 09:38:56 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-10-27 09:38:56 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-10-27 09:38:56 java.base/java.lang.Thread.run(Thread.java:829), errorCode:1001 (com.snowflake.kafka.connector.SnowflakeSinkConnector)
2023-10-27 09:38:56 [2023-10-27 14:38:56,209] INFO SecretsManagerConfigProviderConfig values: 
2023-10-27 09:38:56     aws.access.key = 
2023-10-27 09:38:56     aws.region = us-west-2
2023-10-27 09:38:56     aws.secret.key = [hidden]
2023-10-27 09:38:56     polling.enabled = true
2023-10-27 09:38:56     polling.interval.seconds = 300
2023-10-27 09:38:56     retry.count = 3
2023-10-27 09:38:56     retry.interval.seconds = 10
2023-10-27 09:38:56     secret.prefix = 
2023-10-27 09:38:56     secret.ttl.ms = 300000
2023-10-27 09:38:56     thread.count = 3
2023-10-27 09:38:56     timeout.seconds = 30
2023-10-27 09:38:56  (io.confluent.csid.config.provider.aws.SecretsManagerConfigProviderConfig)
2023-10-27 09:38:56 [2023-10-27 14:38:56,872] INFO AbstractConfig values: 
2023-10-27 09:38:56  (org.apache.kafka.common.config.AbstractConfig)

I do not understand what is incorrect in the config provider OR see anything in the logs as well.

Any tips on how to check what might be causing the issue?

Thanks,
Hitesh

Got this to work and the issue is pretty stupid one :slight_smile:

Quotes after the ENV variables

CONNECT_CONFIG_PROVIDERS": "secretmanager" 
CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_CLASS": "io.confluent.csid.config.provider.aws.SecretsManagerConfigProvider"
CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_PARAM_AWS_REGION": "us-west-2"

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.