Can't connect to broker in Confluent Cloud

Hi everyone, I’m following this tutorial step by step to configure a springboot application with apache kafka at the confluent cloud -https://developer.confluent.io/learn-kafka/spring/hands-on-consume-messages/-.

When I try to launch the application I get these messages:

INFO 54940 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.
WARN 54940 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (pkc-03vj5.europe-west8.gcp.confluent.cloud/34.154.0.122:9092) could not be established. Broker may not be available.
WARN 54940 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker pkc-03vj5.europe-west8.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected

KafkaConfiguration.java

package io.confluent.developer.spring;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;

@Configuration
public class KafkaConfiguration {


    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(
                Map.of(BOOTSTRAP_SERVERS_CONFIG, "pkc-03vj5.europe-west8.gcp.confluent.cloud:9092",
                        RETRIES_CONFIG, 0,
                        BUFFER_MEMORY_CONFIG, 33554432,
                        KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                        VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
                ));
    }


    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Map<String, Object> consumerProperties() {
        return Map.of(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "pkc-03vj5.europe-west8.gcp.confluent.cloud:9092",
                GROUP_ID_CONFIG, "spring-ccloud",
                ENABLE_AUTO_COMMIT_CONFIG, false,
                SESSION_TIMEOUT_MS_CONFIG, 15000,
                KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }
}

SpringCloudApplication.java

package io.confluent.developer.spring;

import com.github.javafaker.Faker;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.stream.Stream;

@SpringBootApplication
public class SpringCcloudApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringCcloudApplication.class, args);
	}

	@RequiredArgsConstructor
	@Component
	class Producer {
		@Autowired
		private KafkaTemplate<String, String> template;

		Faker faker;

		@EventListener(ApplicationStartedEvent.class)
		public void generate() {

			faker = Faker.instance();
			final Flux<Long> interval = Flux.interval(Duration.ofMillis(1_000));

			final Flux<String> quotes = Flux.fromStream(Stream.generate(() -> faker.hobbit().quote()));

			Flux.zip(interval, quotes)
					.map(it -> template.send("hobbit", String.valueOf(faker.random().nextInt(42)), it.getT2())).blockLast();
		}
	}
	@Component
	class Consumer {
		@KafkaListener(topics= {"hobbit"}, groupId="spring-boot-kafka")
		public void consume(String quote) {
			System.out.println("received= " + quote);
		}
	}
}

application.properties

# Required connection configs for Kafka producer, consumer, and admin
spring:
  kafka:
    bootstrap.servers:pkc-03vj5.europe-west8.gcp.confluent.cloud:9092
    properties:
      security:
       protocol:SASL_SSL
        sasl:
         jaas:
          config:org.apache.kafka.common.security.plain.PlainLoginModule required username='X7YF4OI4HP3OW5WV' password='ueiKiWLUhNzDtMtU1BAZGIRqCoaPDeMZ2YSkpcS/uiP3dHNxR7ujTIqUStfZrV1p';
         mechanism:PLAIN
    producer:
      key-serializer:org.apache.kafka.common.serialization.StringSerializer
      value-serializer:org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id:spring-boot-kafka
      auto-offset-reset:earliest
      key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:org.apache.kafka.common.serialization.StringDeserializer

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.0.0</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>io.confluent.developer</groupId>
	<artifactId>spring-ccloud</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-ccloud</name>
	<description>Demo project for Spring Boot with Confluent Cloud</description>
	<properties>
		<java.version>17</java.version>
		<spring-cloud.version>2022.0.0-RC2</spring-cloud.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
			<scope>test</scope>
			<classifier>test-binder</classifier>
			<type>test-jar</type>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.github.javafaker</groupId>
			<artifactId>javafaker</artifactId>
			<version>1.0.2</version>
		</dependency>
	</dependencies>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<repositories>
		<repository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>

</project>

Hi @SeverusP

just to be sure there is not network issue.
Does a telnet to the cluster work?

Best,
Michael

1 Like

Please edit your post to remove access credentials.

Thanks, this credentials are no longer available, anyway I don’t understand how to edit the post

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.