Hi,
I am trying to run a local kafka connect container for Snowpipe Streaming (Snowflake Sink) and below is the definition/config. I am also trying to externalize the secrets and trying to use the AWS secrets Manager to fetch the configuration/credentials.
kafka-connect-integration/Dockerfile
FROM confluentinc/cp-kafka-connect:7.5.1
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components/"
RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:2.0.1
RUN confluent-hub install --no-prompt confluentinc/csid-secrets-provider-aws:1.0.8
Connect Container
kafka-connect:
build: kafka-connect-integration
container_name: connect
hostname: connect
ports:
- 8083:8083
depends_on:
- broker
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_GROUP_ID: "snow-streaming-grp1"
CONNECT_CONFIG_STORAGE_TOPIC: "_snow-streaming-grp1-configs"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: "_snow-streaming-grp1-offsets"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: "_snow-streaming-grp1-status"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "snow-streaming"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_PROVIDERS": "secretmanager"
CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_CLASS": "io.confluent.csid.config.provider.aws.SecretsManagerConfigProvider"
CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_PARAM_AWS_REGION": "us-west-2"
CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_PARAM_AWS_ACCESS_KEY: "XXXXXXXXXXXXX"
CONNECT_CONFIG_PROVIDERS_SECRETMANAGER_PARAM_AWS_SECRET_KEY: "XXXXXXXXXXXXX"
Connector setup:
{
"name":"dna-snowflake-sink-edw_uat-public",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"1",
"topics":"snowflake-stream",
"snowflake.topic2table.map": "topic1:table1",
"buffer.count.records":"1",
"buffer.flush.time":"1",
"buffer.size.bytes":"5000000",
"config.providers": "secretmanager",
"config.providers.secretmanager.class": "io.confluent.csid.config.provider.aws.SecretsManagerConfigProvider",
"config.providers.secretmanager.param.aws.region": "us-west-2",
"config.providers.secretsManager.param.aws.access.key": "XXXXXXXXXX",
"config.providers.secretsManager.param.aws.secret.key": "XXXXXXXXXX",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.url.name":"<SNOWFLAKE_ACCOUNT>.snowflakecomputing.com:443",
"snowflake.user.name": "${secretmanager:aws_sectret_name:user}",
"snowflake.private.key":"<PRIVATE_KEY>",
"snowflake.database.name":"<DATABASE_NAME>",
"snowflake.schema.name":"DATABASE_SCHEMA",
"errors.log.enable": true,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}
When I POST the connector config to the rest API it seems that I am unable to get the USERNAME from the secrets manager.
The error that I get:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 3 error(s):\nsnowflake.url.name: Cannot connect to Snowflake\nsnowflake.user.name: Cannot connect to Snowflake\nsnowflake.private.key: Cannot connect to Snowflake\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}
LOG:
2023-10-27 09:38:56 [2023-10-27 14:38:56,114] ERROR [SF_KAFKA_CONNECTOR] Validate: Error connecting to snowflake:[SF_KAFKA_CONNECTOR] Exception: Failed to connect to Snowflake Server
2023-10-27 09:38:56 Error Code: 1001
2023-10-27 09:38:56 Detail: Snowflake connection issue, reported by Snowflake JDBC
2023-10-27 09:38:56 Message: JWT token is invalid.
2023-10-27 09:38:56 net.snowflake.client.core.SessionUtil.newSession(SessionUtil.java:695)
2023-10-27 09:38:56 net.snowflake.client.core.SessionUtil.openSession(SessionUtil.java:291)
2023-10-27 09:38:56 net.snowflake.client.core.SFSession.open(SFSession.java:477)
2023-10-27 09:38:56 net.snowflake.client.jdbc.DefaultSFConnectionHandler.initialize(DefaultSFConnectionHandler.java:104)
2023-10-27 09:38:56 net.snowflake.client.jdbc.DefaultSFConnectionHandler.initializeConnection(DefaultSFConnectionHandler.java:79)
2023-10-27 09:38:56 net.snowflake.client.jdbc.SnowflakeConnectionV1.initConnectionWithImpl(SnowflakeConnectionV1.java:116)
2023-10-27 09:38:56 net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:96)
2023-10-27 09:38:56 net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:180)
2023-10-27 09:38:56 com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.<init>(SnowflakeConnectionServiceV1.java:82)
2023-10-27 09:38:56 com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory$SnowflakeConnectionServiceBuilder.build(SnowflakeConnectionServiceFactory.java:83)
2023-10-27 09:38:56 com.snowflake.kafka.connector.SnowflakeSinkConnector.validate(SnowflakeSinkConnector.java:223)
2023-10-27 09:38:56 org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:509)
2023-10-27 09:38:56 org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:390)
2023-10-27 09:38:56 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-10-27 09:38:56 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-10-27 09:38:56 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-10-27 09:38:56 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-10-27 09:38:56 java.base/java.lang.Thread.run(Thread.java:829), errorCode:1001 (com.snowflake.kafka.connector.SnowflakeSinkConnector)
2023-10-27 09:38:56 [2023-10-27 14:38:56,209] INFO SecretsManagerConfigProviderConfig values:
2023-10-27 09:38:56 aws.access.key =
2023-10-27 09:38:56 aws.region = us-west-2
2023-10-27 09:38:56 aws.secret.key = [hidden]
2023-10-27 09:38:56 polling.enabled = true
2023-10-27 09:38:56 polling.interval.seconds = 300
2023-10-27 09:38:56 retry.count = 3
2023-10-27 09:38:56 retry.interval.seconds = 10
2023-10-27 09:38:56 secret.prefix =
2023-10-27 09:38:56 secret.ttl.ms = 300000
2023-10-27 09:38:56 thread.count = 3
2023-10-27 09:38:56 timeout.seconds = 30
2023-10-27 09:38:56 (io.confluent.csid.config.provider.aws.SecretsManagerConfigProviderConfig)
2023-10-27 09:38:56 [2023-10-27 14:38:56,872] INFO AbstractConfig values:
2023-10-27 09:38:56 (org.apache.kafka.common.config.AbstractConfig)
I do not understand what is incorrect in the config provider OR see anything in the logs as well.
Any tips on how to check what might be causing the issue?
Thanks,
Hitesh