Connecting to Confluent cloud using load balancer

I am looking for some help on an issue with my setup. Allow me to explain. I have a load balancer in front of rest proxy. The rest proxy connects to Confluent cloud cluster at the back. Using the DNS record of the the load balancer, I can query Confluent cloud cluster, I can make api calls for POST/GET topics. So api calls like the one shown below works.

> $ curl GET "http://alb-ABC123-512201843.us-east-1.elb.amazonaws.com/v3/clusters/"

Now comes the problematic part, I have a vanilla producer in java that I can use for producing messages . I am trying to use the producer to send message to topic in Confluent cloud cluster. But I run in to issues when I am trying to send messages using producer.

[2021-04-04 19:14:16,654] INFO ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [alb-ABC123-512201843.us-east-1.elb.amazonaws.com:80]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = false
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 20000
        retries = 2147483647
        retry.backoff.ms = 500
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = PLAIN
        security.protocol = SASL_PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
 (org.apache.kafka.clients.producer.ProducerConfig)
[2021-04-04 19:14:16,792] INFO Successfully logged in. (org.apache.kafka.common.security.authenticator.AbstractLogin)
[2021-04-04 19:14:16,818] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-04-04 19:14:16,818] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoParser)
[2021-04-04 19:14:16,818] INFO Kafka startTimeMs: 1617520456816 (org.apache.kafka.common.utils.AppInfoParser)
Producing record: alice 0
[2021-04-04 19:14:18,526] WARN [Producer clientId=producer-1] Connection to node -1 (alb-ABC123-512201843.us-east-1.elb.amazonaws.com/35.174.235.156:80) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient

The load balancer doesn’t have a certificate & I have tried both SASL_PLAINTEXT & SASL_PLAIN setting in configuration file but to no avail.

Can I please get some help here? I think I am close but I don’t know where to go from here.

Perhaps I’m misunderstanding you. If the load balancer is in front of the REST Proxy, then you connect to it using HTTP calls - not the Producer API.

1 Like

I should’ve wrote the problem better but I guess I was exhausted looking for a resolution. I was actually looking for a producer that can work with REST proxy? The simple producer I have works well but obviously is not for making http calls. I was wondering if there is an example showing a producer that can work with REST proxy to produce messages? For example, how can I define header values in a simple producer to make it work?

The default security.protocol on Confluent cloud is SASL_SSL I think. You should change SASL_PLAINTEXT to SASL_SSL. You can use client example (like python example) to produce or consume messages using security.protocol=SASL_SSL refering to the tutorials.

And, how do you change the security.protocol from SASL_SSL to SASL_PLAINTEXT on Confluent cloud?