Confluent Kafka Self managed Lambda sink connector configuration issues

Hello,
Here’s my scenario, I am currently running Kafka connect in a docker container in distributed mode. I am facing following exceptions when i am posting the connector configuration.

[2022-05-06 05:05:35,050] ERROR Failed to start task LambdaSinkConnectorProdPull-0 (org.apache.kafka.connect.runtime.Worker)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:442)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:275)
        at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.createAndSetup(DeadLetterQueueReporter.java:97)
        at org.apache.kafka.connect.runtime.Worker.sinkTaskReporters(Worker.java:784)
        at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:622)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:545)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:431)
        ... 12 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
        at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:303)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
        at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)
        at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
        ... 17 more
[2022-05-06 05:05:35,051] ERROR [Worker clientId=connect-1, groupId=quickstart] Couldn't instantiate task LambdaSinkConnectorProdPull-0 because it has an invalid task configuration. This task will not execute until reconfigured. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.NullPointerException
        at org.apache.kafka.connect.runtime.Worker$ConnectorStatusMetricsGroup.recordTaskRemoved(Worker.java:1037)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:554)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Here’s my Dockerfile & Configurations to start the container & Connector config.

DockerFile:

FROM confluentinc/cp-kafka-connect-base:7.1.1

RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-aws-lambda:2.0.1

Here’s how I am running the container in my EC2:

docker run -d \
  --name=kafka-connect \
  --net=host \
  -e CONNECT_BOOTSTRAP_SERVERS=xxxxx:9092 \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_GROUP_ID="quickstart" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
  -e CONNECT_LISTENERS="http://0.0.0.0:8083" \
  -e CONNECT_HEAP_OPTS="-Xmx512M -Xms512M" \
  -e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000 \
  -e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_SASL_MECHANISM="PLAIN" \
  -e CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE="USERINFO" \
  -e CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_CONSUMER_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_CONSUMER_SASL_MECHANISM="PLAIN" \
  -e CONNECT_CONSUMER_RETRY_BACKOFF_MS=500 \
  -e AWS_DEFAULT_REGION="us-east-1" \
  -e AWS_ACCESS_KEY_ID="xxxxx" \
  -e AWS_SECRET_ACCESS_KEY="xxxx" \
  -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";" \
  lambda-sink

Here’s my LambdaSink Connector config:

{
  "name": "LambdaSinkConnectorPull",
  "config": {
    "aws.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
    "aws.access.key.id":"xxx",
    "aws.secret.access.key":"xxxxx",
    "aws.lambda.function.name": "app-prod-pull-gdev-dev-srikanth-prodKafkaConnect",
    "aws.lambda.invocation.type": "sync",
    "aws.lambda.invocation.timeout.ms": "300000",
    "aws.lambda.batch.size": "25",
    "aws.lambda.region": "us-east-1",
    "behavior.on.error": "fail",
    "connector.class": "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
    "confluent.topic.bootstrap.servers": "xxxx:9092",
    "confluent.topic.replication.factor": "3",
    "confluent.topic.security.protocol": "SASL_SSL",
    "confluent.topic.ssl.endpoint.identification.algorithm": "HTTPS",
    "sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "confluent.topic.sasl.mechanism": "PLAIN",
    "consumer.override.fetch.min.bytes": "100000",
    "errors.retry.timeout": "30000",
    "errors.retry.delay.max.ms": "10000",
    "errors.tolerance": "none",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.deadletterqueue.context.headers.enable": "true",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.deadletterqueue.topic.name": "xxxx_GARBAGE",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "6",
    "topics": "xxxx",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}

Can you please point me where am i making it wrong, and i also sense that i am duplicating the config in worker config & connector config.

Can you also tell me What’s the right way to do it?

Thanks

@rmoff
Hello, I have watched some of your videos on setting up the Kafka-connect, I noticed that u rather want us to ask questions in forum than Youtube comments.
So far in my research I couldn’t spot what exactly wrong with configuration, trail and error lots of different configs couldn’t get there yet, hence i posted this post.
I have also seen this POST , which is close to my question, but couldn’t spot much of activity.
I have Confluent running the kafka cluster & and i am running it a EC2 machine, you can refer what configs that i am trying in the POST.

Hello Community, I got all answers. here’s my experience in finding the right config to make it work & I will also attach the final config at the end.

I wanted to take some help from confluent , but we can’t afford to spend $1000 support plan hence i wanted to dig it on my own.

At first I wanted to know why this error occurring by taking a look at the Kafka-connect & lambda sink connector CODE , but to my surprise since its confluent owned we can’t see the code. I tried to dig to the code by looking at the lambda sink JAR , no Success there either. Then I started the go with line by line understanding of what is there in every statement in the worker log file.
Then I realized that it somehow I missed to add SSL mechanism in reporter configs.

    "reporter.admin.sasl.mechanism": "PLAIN",
    "reporter.producer.sasl.mechanism": "PLAIN",

I also faced some issues with configs related to this AdminConfig, for which i fixed the configs which i forgot how i fixed, but i will the entire config here for everyone’s reference.

here’s my worker config :slight_smile:

docker run -d \
  --name=kafka-connect \
  --net=host \
  -e CONNECT_GROUP_ID="connect" \
  -e CONNECT_BOOTSTRAP_SERVERS="xxxx.us-east-1.aws.confluent.cloud:9092" \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="CCLOUD_SR_URL" \
  -e CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE="USER_INFO" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/filestream-connectors,/usr/share/confluent-hub-components" \
  -e CONNECT_LISTENERS="http://0.0.0.0:8083" \
  -e CONNECT_HEAP_OPTS="-Xmx512M -Xms512M" \
  -e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000 \
  -e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_PRODUCER_SASL_MECHANISM="PLAIN" \
  -e CONNECT_COSNUMER_SASL_MECHANISM="PLAIN" \
  -e CONNECT_SASL_MECHANISM="PLAIN" \
  -e CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_CONSUMER_SECURITY_PROTOCOL="SASL_SSL" \
  -e CONNECT_CONSUMER_SASL_MECHANISM="PLAIN" \
  -e CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="HTTPS" \
  -e CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="HTTPS" \
  -e CONNECT_REQUEST_TIMEOUT_MS="50000" \
  -e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS="50000" \
  -e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS="50000" \
  -e CONNECT_CONSUMER_RETRY_BACKOFF_MS=500 \
  -e AWS_DEFAULT_REGION="us-east-1" \
  -e AWS_ACCESS_KEY_ID="xxxx" \
  -e AWS_SECRET_ACCESS_KEY="xxxx" \
  -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";" \
  -e CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";" \
  -e CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";" \
  lambda-sink

Here’s my Lambda Sink Config:

 
  {
  "name": "LambdaSinkConnectorPull",
  "config": {
    "aws.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
	"aws.access.key.id":"xx",
	"aws.secret.access.key":"xxxx",
    "aws.lambda.function.name": "app-prod-pull-gdev-dev-srikanth-pullPOCKafkaConnect",
    "aws.lambda.invocation.type": "sync",
    "aws.lambda.invocation.timeout.ms": "300000",
    "aws.lambda.batch.size": "25",
    "aws.lambda.region": "us-east-1",
    "behavior.on.error": "fail",
    "connector.class": "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
    "confluent.topic.bootstrap.servers": "xxxx.us-east-1.aws.confluent.cloud:9092",
    "confluent.topic.replication.factor": "3",
    "confluent.topic.security.protocol": "SASL_SSL",
    "confluent.topic.ssl.endpoint.identification.algorithm": "HTTPS",
	"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
	"producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
	"consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "confluent.topic.sasl.mechanism": "PLAIN",
	"producer.sasl.mechanism":"PLAIN",
	"consumer.sasl.mechanism":"PLAIN",
	"sasl.mechanism":"PLAIN",
    "consumer.override.fetch.min.bytes": "100000",
	"reporter.bootstrap.servers": "xxxx.us-east-1.aws.confluent.cloud:9092",
	"reporter.result.topic.replication.factor": "3",
    "reporter.result.topic.name": "PROD_PULL.RESULT",
    "reporter.admin.security.protocol": "SASL_SSL",
    "reporter.admin.ssl.endpoint.identification.algorithm": "HTTPS",
    "reporter.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "reporter.admin.sasl.mechanism": "PLAIN",
    "reporter.producer.security.protocol": "SASL_SSL",
    "reporter.producer.ssl.endpoint.identification.algorithm": "HTTPS",
    "reporter.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"xxxx\"   password=\"xxxx\";",
    "reporter.producer.sasl.mechanism": "PLAIN",
    "errors.retry.timeout": "30000",
    "errors.retry.delay.max.ms": "10000",
    "errors.tolerance": "none",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.deadletterqueue.context.headers.enable": "true",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.deadletterqueue.topic.name": "PROD_PULL_GARBAGE",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "6",
    "topics": "PULL_PRODUCTS",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  }
}

Hopefully this should work for anyone who had this issue in the past.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.