Hello,
Here’s my scenario, I am currently running Kafka connect in a docker container in distributed mode. I am facing following exceptions when i am posting the connector configuration.
[2022-05-06 05:05:35,050] ERROR Failed to start task LambdaSinkConnectorProdPull-0 (org.apache.kafka.connect.runtime.Worker)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:442)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:275)
at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.createAndSetup(DeadLetterQueueReporter.java:97)
at org.apache.kafka.connect.runtime.Worker.sinkTaskReporters(Worker.java:784)
at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:622)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:545)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:431)
... 12 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:303)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
... 17 more
[2022-05-06 05:05:35,051] ERROR [Worker clientId=connect-1, groupId=quickstart] Couldn't instantiate task LambdaSinkConnectorProdPull-0 because it has an invalid task configuration. This task will not execute until reconfigured. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.NullPointerException
at org.apache.kafka.connect.runtime.Worker$ConnectorStatusMetricsGroup.recordTaskRemoved(Worker.java:1037)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:554)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Here’s my Dockerfile & Configurations to start the container & Connector config.
DockerFile:
FROM confluentinc/cp-kafka-connect-base:7.1.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-aws-lambda:2.0.1
Here’s how I am running the container in my EC2:
docker run -d \
--name=kafka-connect \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS=xxxxx:9092 \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="quickstart" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
-e CONNECT_LISTENERS="http://0.0.0.0:8083" \
-e CONNECT_HEAP_OPTS="-Xmx512M -Xms512M" \
-e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000 \
-e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
-e CONNECT_SASL_MECHANISM="PLAIN" \
-e CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE="USERINFO" \
-e CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_SSL" \
-e CONNECT_CONSUMER_SECURITY_PROTOCOL="SASL_SSL" \
-e CONNECT_CONSUMER_SASL_MECHANISM="PLAIN" \
-e CONNECT_CONSUMER_RETRY_BACKOFF_MS=500 \
-e AWS_DEFAULT_REGION="us-east-1" \
-e AWS_ACCESS_KEY_ID="xxxxx" \
-e AWS_SECRET_ACCESS_KEY="xxxx" \
-e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";" \
lambda-sink
Here’s my LambdaSink Connector config:
{
"name": "LambdaSinkConnectorPull",
"config": {
"aws.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"aws.access.key.id":"xxx",
"aws.secret.access.key":"xxxxx",
"aws.lambda.function.name": "app-prod-pull-gdev-dev-srikanth-prodKafkaConnect",
"aws.lambda.invocation.type": "sync",
"aws.lambda.invocation.timeout.ms": "300000",
"aws.lambda.batch.size": "25",
"aws.lambda.region": "us-east-1",
"behavior.on.error": "fail",
"connector.class": "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
"confluent.topic.bootstrap.servers": "xxxx:9092",
"confluent.topic.replication.factor": "3",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "HTTPS",
"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";",
"producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";",
"consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"consumer.override.fetch.min.bytes": "100000",
"errors.retry.timeout": "30000",
"errors.retry.delay.max.ms": "10000",
"errors.tolerance": "none",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.topic.name": "xxxx_GARBAGE",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "6",
"topics": "xxxx",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Can you please point me where am i making it wrong, and i also sense that i am duplicating the config in worker config & connector config.
Can you also tell me What’s the right way to do it?
Thanks