Here’s a limitation Eureka has: it doesn’t have out-of-the-box EurekaServiceRegisteredEvent
. I want to create one. I also want it to send every such event to Kafka for other services to process. I also want to use Stream Cloud Stream Kafka. Spring’s user guide is patchy and doesn’t provide a full picture. I also couldn’t find examples of Kafka producers (written using Spring Cloud Stream) on the internet
EurekaServiceCanceledEvent
would be nice to have too
@Component
public class EurekaApplicationListener {
@EventListener
public void onEurekaInstanceRegisteredEvent(EurekaInstanceRegisteredEvent event) {
if (!event.isReplication()) {
EurekaServiceRegisteredEvent serviceRegisteredEvent = new EurekaServiceRegisteredEvent(event.getInstanceInfo().getAppName());
// send to Kafka topic "service-registered-event"
}
}
@EventListener
public void onEurekaInstanceCanceledEvent(EurekaInstanceCanceledEvent event) {
if (!event.isReplication()) {
EurekaServiceCanceledEvent serviceCanceledEvent = new EurekaServiceCanceledEvent(event.getAppName());
// send to Kafka topic "service-canceled-event"
}
}
}
@NoArgsConstructor
@Getter
@Setter
public class EurekaServiceRegisteredEvent {
private String registeredServiceName;
public EurekaServiceRegisteredEvent(String registeredServiceName) {
this.registeredServiceName = registeredServiceName;
}
}
@NoArgsConstructor
@Getter
@Setter
public class EurekaServiceCanceledEvent {
private String canceledServiceName;
public EurekaServiceCanceledEvent(String canceledServiceName) {
this.canceledServiceName = canceledServiceName;
}
}
<!-- I guess this should be enough -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
How do I write such a basic producer?
Or maybe you could argue it’s an XY problem, and I in fact don’t need Spring Cloud Stream for this? I know I can use plain Kafka, but I fugured it would be easier and more declarative with an extra abstraction layer on top (as with Spring Data)
Anyway, I tried using the plain Spring Kafka
package com.example.eurekaserver.config.kafka;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic eurekaServiceRegistered() {
return new NewTopic("service-registered", 1, (short) 1);
}
@Bean
public NewTopic eurekaServiceCanceled() {
return new NewTopic("service-canceled", 1, (short) 1);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.example.eurekaserver.listener;
import com.example.eurekaserver.event.ServiceCanceledEvent;
import com.example.eurekaserver.event.ServiceRegisteredEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceCanceledEvent;
import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceRegisteredEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class EurekaApplicationListener {
private final KafkaTemplate<String, Object> kafkaTemplate;
@EventListener
public void onEurekaInstanceRegisteredEvent(EurekaInstanceRegisteredEvent event) {
if (!event.isReplication()) {
ServiceRegisteredEvent serviceRegisteredEvent = new ServiceRegisteredEvent(event.getInstanceInfo().getAppName());
kafkaTemplate.send("service-registered", serviceRegisteredEvent);
}
}
@EventListener
public void onEurekaInstanceCanceledEvent(EurekaInstanceCanceledEvent event) {
if (!event.isReplication()) {
ServiceCanceledEvent serviceCanceledEvent = new ServiceCanceledEvent(event.getAppName());
kafkaTemplate.send("service-canceled", serviceCanceledEvent);
}
}
}
server:
port: 8761
eureka:
server:
enable-self-preservation: false
client:
register-with-eureka: false
fetch-registry: false
spring:
kafka:
bootstrap-servers: localhost:9092
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.1</version>
</dependency>
Here’s how I started a Kafka server:
version: "3"
services:
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://localhost:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
The server started up ok, but my app (which is not run within any Docker container) can’t connect to it:
2023-12-26T21:58:00.137+03:00 INFO 1728 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
2023-12-26T21:58:00.130+03:00 ERROR 1728 --- [ main] o.springframework.kafka.core.KafkaAdmin : Could not configure topics
org.springframework.kafka.KafkaException: Timed out waiting to get existing topics
I wonder what’s wrong