Hi all.
I’m new in Confluent Cloud and I’m trying to send a simple event from a Java Spring Boot client to a Confluent Cloud topic. I saw some tutorial from Confluent site and some source code from github, however I can’t get it to work.
I always receive a log message like this:
org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker xxx-yyyyy.us-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
My application.properties is:
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-4yyd6.us-east1.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>';
spring.kafka.properties.security.protocol=SASL_SSL
And my Kafka configuration class is:
@Configuration
public class KafkaConfiguration {
Logger log = LoggerFactory. *getLogger* (getClass());
@Autowired
private Environment env;
@Bean
public ProducerFactory<Integer, String> producerFactory() {
String kafkaServer = env.getProperty("spring.kafka.properties.bootstrap.servers");
log.info("Kafka Server: " + kafkaServer);
return new DefaultKafkaProducerFactory<>(Map.of (
BOOTSTRAP_SERVERS_CONFIG, kafkaServer,
RETRIES_CONFIG , 0,
BATCH_SIZE_CONFIG , 16384,
LINGER_MS_CONFIG , 1,
BUFFER_MEMORY_CONFIG , 33554432,
KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class,
VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
);
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Finaly, this is the producer class:
@Component
class Producer {
@Autowired
private KafkaTemplate<Integer, String> template;
Faker faker;
@EventListener(ApplicationStartedEvent.class)
public void generate() {
faker = Faker.instance();
final Flux<Long> interval = Flux.interval(Duration.ofMillis (1_000));
final Flux<String> quotes = Flux.fromStream(Stream.generate(() -> faker.hobbit().quote()));
Flux.zip(interval, quotes)
.map(it -> template.send("test", faker.random().nextInt(42), it.getT2())).blockLast();
}
}
I appreciate your help with this.