Cannot connect to kafka

I use SSL, trust store, previously had network issues (SSL handshake failed), checked certificate chain for endpoint (imported them into key store), also got certificates for all bootstrap servers.
Currently connecting still get error message:

Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512)

Hi @aa07181 . I’m guessing you’re using a self-managed solution. Are you using Confluent Platform. How are you trying to connect?

I have provider, he gave me couple of bootstrap servers, username and password. I am connecting using Java.

package client;

import java.io.IOException;

import java.net.InetSocketAddress;
import java.net.Socket;

import java.util.Arrays;
import java.util.Base64;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {
    public Consumer() {
        super();
    }

    private static String getVariableData(String string) {

        if (string == "bootstrap_servers") bootstrapserver1
            return "bootstrapserver1.net:9091,bootstrapserver2.net:9092";

        if (string == "group.id.SIKAFKA")
            return "some_group_id";

        if (string == "topic.id.SIKAFKA")
            return "some_topic_id";
			
        if (string == "username")
            return "username";			

        if (string == "password")
            return "password";

        return "";
    }

    public static void main(String[] args) {
        try {

            ConsumerRecords<String, String> records;
            Properties props = new Properties();

            // Server
            props.put("bootstrap.servers", getVariableData("bootstrap_servers"));
            //Consumer group
            props.setProperty("group.id", getVariableData("group.id.SIKAFKA"));
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            // Settings for authentication
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.mechanism", "SCRAM-SHA-512");

            props.put("ssl.truststore.location", "location");
            props.put("ssl.truststore.password", "password");
            props.put("ssl.truststore.type", "JKS");

            props.put("ssl.keystore.location", "location");
            props.put("ssl.keystore.password", "password");
            props.put("ssl.truststore.type", "JKS");

            props.put("ssl.endpoint.identification.algorithm", "");
            //props.put("dbms.jvm.additional", "-Djavax.net.debug=ssl:handshake");
            //props.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1,TLSv1,TLSv1.2,TLSv1.3");
            //props.put("ssl.protocol", "TLSv1.3");

            String jaasTemplate =
                "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
            String jaasCfg = String.format(jaasTemplate, getVariableData("username"), getVariableData("password"));
            props.put("sasl.jaas.config", jaasCfg);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(getVariableData("topic.id.SIKAFKA")));

            System.out.printf("Start!");
            records = consumer.poll(1);
            System.out.printf("Started to poll!");
            while (records.count()>0) {
                System.out.printf("While start");
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            records = consumer.poll(100);
            System.out.printf("While end");                
        }
            System.out.printf("End!");
        } catch (Exception e) {
            System.out.printf(e.getMessage() + e.getLocalizedMessage());
        }
    }
}

Worth mentioning, you shouldn’t use double equals to compare strings in Java

So no ideas from kafka experts?..

I don’t think Kafka is the actual problem.

Write unit tests for your getVariableData method. I think it’s returning empty strings because you’re not comparing strings correctly, so it falls down to return "" , causing your authentication to not get the correct username or password, then fail

A better approach would be to use an external properties file and call Properties.load