Can not connect to Kafka brokers using SSL using Elasticsearch sink connector

Hello. I have a problem starting connection to kafka brokers using connect-standalone method with ElasticSearch Sink Connector. I have both config-standalone.properties and elastic.properties set like this :

config-standalone.properties :

bootstrap.servers=broker1:9094,broker2:9094

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

allow.auto.create.topics = false

SSL configurations

security.protocol=SSL

ssl.keystore.location=/path/to/keystore.jks

ssl.keystore.password=xxx

ssl.key.password=xxx

offset.storage.file.filename=/etc/kafka/connect.offsets

listeners=http://0.0.0.0:8084

plugin.path=/usr/share/java,/usr/share/confluent-hub-components

now elastic.properties:

name=elasticsearch-sink

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1

topics=topic

key.ignore=true

connection.url=https://elasticsearchurl.com:443

Elasticsearch authentication

connection.username=login

connection.password=xxx

Kafka Bootstrap servers with SSL

consumer.bootstrap.servers=broker1:9094,broker2:9094

SSL configurations for Kafka Connect and Elasticsearch Sink Connector

security.protocol=SSL

ssl.keystore.location=/path/to/keystore.jks

ssl.keystore.password=xxx

Everything is looking good, until last step which is applying this properties to consumer config.

INFO ConsumerConfig values:

    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [broker1:9094,broker2:9094]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = connector-consumer-elasticsearch-sink-0
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = connect-elasticsearch-sink
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

(org.apache.kafka.clients.consumer.ConsumerConfig)

The most iimportant thing inside this consumer config is that parameters ssl.keystore.location and ssl.keystore.password are both null, i do not know how to pass them inside this config. I need those two parameters be the same as they are in for example adminclientconfig when starting this sink connector :

INFO AdminClientConfig values:

    bootstrap.servers = [broker1:9094,broker2:9094]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = [hidden]
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = /path/to/keystore.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

(org.apache.kafka.clients.admin.AdminClientConfig)
As you can see, there is a exactly same path to keystore and password(hidden) as i have stated inside connect-standalone.properties. I even stated path to this keystore inside elastic.properties. But no results. Thanks

Prefix any consumer properties with consumer., i.e., specify consumer.ssl.keystore.location and consumer.ssl.keystore.password in the worker config (docs here).

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.