I managed to successfully set up Zookeeper and Kafka servers, and CLI pub/sub operations work fine:
nadchel@LAPTOP-CKP86E2O:/usr/local/kafka/bin$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
>Another message
>Yet another message
>^C
nadchel@LAPTOP-CKP86E2O:/usr/local/kafka/bin$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
Another message
Yet another message
^CProcessed a total of 2 messages
However, when I try to perform similar operations from a simple Java app, it can’t connect. Here’s the app code:
package org.example.demos.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerMicroservice {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Configure Kafka producer
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Generate and send messages
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record);
}
// Close the producer
producer.close();
}
}
package org.example.demos.kafka;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerMicroservice {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Configure Kafka consumer
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "my-group");
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
// Subscribe to the topic and start consuming messages
consumer.subscribe(Collections.singleton(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("Received message: " + record.value());
});
}
}
}
}
A snippet from a producer log:
---- 07 сент. 2023 21:47:49,390
DEBUG [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Connection with localhost/127.0.0.1 (channelId=-1) disconnected
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.Net.pollConnect(Native Method) ~[?:?]
at sun.nio.ch.Net.pollConnectNow(Net.java:672) ~[?:?]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946) ~[?:?]
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-3.5.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:224) ~[kafka-clients-3.5.1.jar:?]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:526) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:481) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:571) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:534) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:455) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) [kafka-clients-3.5.1.jar:?]
at java.lang.Thread.run(Thread.java:833) [?:?]
---- 07 сент. 2023 21:47:49,390
INFO [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Node -1 disconnected.
---- 07 сент. 2023 21:47:49,390
WARN [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
---- 07 сент. 2023 21:47:49,390
WARN [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
---- 07 сент. 2023 21:47:49,391
DEBUG [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Disconnect from localhost:9092 (id: -1 rack: null) while trying to send request InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1). Going to back off and retry.
java.io.IOException: Connection to localhost:9092 (id: -1 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70) ~[kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:534) ~[kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:455) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) [kafka-clients-3.5.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) [kafka-clients-3.5.1.jar:?]
at java.lang.Thread.run(Thread.java:833) [?:?]
---- 07 сент. 2023 21:47:49,492
DEBUG [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Enqueuing transactional request InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1)
---- 07 сент. 2023 21:47:49,492
DEBUG [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Enqueuing transactional request InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1)
---- 07 сент. 2023 21:47:49,492
DEBUG [kafka-producer-network-thread | producer-1] : [Producer clientId=producer-1] Give up sending metadata request since no node is available
I made sure that:
- Kafka versions are compatible (both are 3.5.1)
- I specified these properties in the
server.properties
file:
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
It’s all run on the same machine (a laptop)
What may be the problem?