Eureka server sending messages to Kafka using Spring Cloud Stream

Here’s a limitation Eureka has: it doesn’t have out-of-the-box EurekaServiceRegisteredEvent. I want to create one. I also want it to send every such event to Kafka for other services to process. I also want to use Stream Cloud Stream Kafka. Spring’s user guide is patchy and doesn’t provide a full picture. I also couldn’t find examples of Kafka producers (written using Spring Cloud Stream) on the internet

EurekaServiceCanceledEvent would be nice to have too

@Component
public class EurekaApplicationListener {
    @EventListener
    public void onEurekaInstanceRegisteredEvent(EurekaInstanceRegisteredEvent event) {
        if (!event.isReplication()) {
            EurekaServiceRegisteredEvent serviceRegisteredEvent = new EurekaServiceRegisteredEvent(event.getInstanceInfo().getAppName());
            // send to Kafka topic "service-registered-event"
        }
    }

    @EventListener
    public void onEurekaInstanceCanceledEvent(EurekaInstanceCanceledEvent event) {
        if (!event.isReplication()) {
            EurekaServiceCanceledEvent serviceCanceledEvent = new EurekaServiceCanceledEvent(event.getAppName());
            // send to Kafka topic "service-canceled-event"
        }
    }
}
@NoArgsConstructor
@Getter
@Setter
public class EurekaServiceRegisteredEvent {
    private String registeredServiceName;

    public EurekaServiceRegisteredEvent(String registeredServiceName) {
        this.registeredServiceName = registeredServiceName;
    }
}
@NoArgsConstructor
@Getter
@Setter
public class EurekaServiceCanceledEvent {
    private String canceledServiceName;

    public EurekaServiceCanceledEvent(String canceledServiceName) {
        this.canceledServiceName = canceledServiceName;
    }
}
<!-- I guess this should be enough -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

How do I write such a basic producer?

Or maybe you could argue it’s an XY problem, and I in fact don’t need Spring Cloud Stream for this? I know I can use plain Kafka, but I fugured it would be easier and more declarative with an extra abstraction layer on top (as with Spring Data)

Anyway, I tried using the plain Spring Kafka

package com.example.eurekaserver.config.kafka;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic eurekaServiceRegistered() {
        return new NewTopic("service-registered", 1, (short) 1);
    }

    @Bean
    public NewTopic eurekaServiceCanceled() {
        return new NewTopic("service-canceled", 1, (short) 1);
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
package com.example.eurekaserver.listener;

import com.example.eurekaserver.event.ServiceCanceledEvent;
import com.example.eurekaserver.event.ServiceRegisteredEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceCanceledEvent;
import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceRegisteredEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class EurekaApplicationListener {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    @EventListener
    public void onEurekaInstanceRegisteredEvent(EurekaInstanceRegisteredEvent event) {
        if (!event.isReplication()) {
            ServiceRegisteredEvent serviceRegisteredEvent = new ServiceRegisteredEvent(event.getInstanceInfo().getAppName());
            kafkaTemplate.send("service-registered", serviceRegisteredEvent);
        }
    }

    @EventListener
    public void onEurekaInstanceCanceledEvent(EurekaInstanceCanceledEvent event) {
        if (!event.isReplication()) {
            ServiceCanceledEvent serviceCanceledEvent = new ServiceCanceledEvent(event.getAppName());
            kafkaTemplate.send("service-canceled", serviceCanceledEvent);
        }
    }
}
server:
  port: 8761

eureka:
  server:
    enable-self-preservation: false
  client:
    register-with-eureka: false
    fetch-registry: false
spring:
  kafka:
    bootstrap-servers: localhost:9092
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.1.1</version>
        </dependency>

Here’s how I started a Kafka server:

version: "3"
services:
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://localhost:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

The server started up ok, but my app (which is not run within any Docker container) can’t connect to it:

2023-12-26T21:58:00.137+03:00  INFO 1728 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata

2023-12-26T21:58:00.130+03:00 ERROR 1728 --- [           main] o.springframework.kafka.core.KafkaAdmin  : Could not configure topics

org.springframework.kafka.KafkaException: Timed out waiting to get existing topics

I wonder what’s wrong