Can not connect to Confluence Cloud topic from Java Springboot client

Hi all.

I’m new in Confluent Cloud and I’m trying to send a simple event from a Java Spring Boot client to a Confluent Cloud topic. I saw some tutorial from Confluent site and some source code from github, however I can’t get it to work.

I always receive a log message like this:

org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker xxx-yyyyy.us-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected

My application.properties is:

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>';
spring.kafka.properties.security.protocol=SASL_SSL

And my Kafka configuration class is:

@Configuration

public class KafkaConfiguration {
  Logger log = LoggerFactory. *getLogger* (getClass());
  @Autowired
  private Environment env;

  @Bean
    public ProducerFactory<Integer, String> producerFactory() {
      String kafkaServer = env.getProperty("spring.kafka.properties.bootstrap.servers");
      log.info("Kafka Server: " + kafkaServer);
      return  new DefaultKafkaProducerFactory<>(Map.of (
        BOOTSTRAP_SERVERS_CONFIG, kafkaServer,
        RETRIES_CONFIG , 0,
        BATCH_SIZE_CONFIG , 16384,
        LINGER_MS_CONFIG , 1,
        BUFFER_MEMORY_CONFIG , 33554432,
        KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class,
        VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
    );

  }

@Bean
 public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
 }
}

Finaly, this is the producer class:

@Component

class Producer {

  @Autowired

  private KafkaTemplate<Integer, String> template;

  Faker faker;

  @EventListener(ApplicationStartedEvent.class)

  public void generate() {

     faker = Faker.instance();

      final Flux<Long> interval = Flux.interval(Duration.ofMillis (1_000));

      final Flux<String> quotes = Flux.fromStream(Stream.generate(() -> faker.hobbit().quote()));

     Flux.zip(interval, quotes)
         .map(it -> template.send("test", faker.random().nextInt(42), it.getT2())).blockLast();
    }
}

I appreciate your help with this.

have you tried with setting this property?

ssl.endpoint.identification.algorithm=https

1 Like

Here is my working config file.

bootstrap.servers=<ccloud kafka>:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<USER>" password="<PASSWORD >";
1 Like

I am also facing same problem, i created confluent cloud cluster, and started spring boot service with Kafka config, but not able to send data using Kafka template send, with simple string message

Hey guys.
I am facing the same issue when I run my springboot application with the KafkaListener to consume from the Confluent protobuf topic message. Locally it is working fine, but not when I am using the Confluent environment.
Please, anyone else here fixed this?

@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class);
configProps.put(“sasl.mechanism”, “PLAIN”);
configProps.put(“sasl.jaas.config”, “org.apache.kafka.common.security.plain.PlainLoginModule required username=‘XXXXXX’ password=‘XXXXXXXX’;”);
configProps.put(“security.protocol”, “SASL_SSL”);
return new DefaultKafkaProducerFactory<>(configProps);

}