hi everybody. I did an experiment in regard to kafka replication using docker-compose. I created a 3-node kafka cluster. I created 3 networks for connectivity between each two nodes. i.e. one network which node 1 and node 2 is attached to, another which is node 1 and node 3 attached to and another for node 2 and node 3. I checked and controller node was node 3. I created a topic with replication-factor of 3 and 1 partition. The leader node of partition was also node3. Nodes 1 and 2 each had 1 replica of the partition. Everything was just fine. Then I disconnected node 1 from the network between node 1 and 3. I expected node 1 to be removed from ISR. However It was constantly being removed from ISR of the topic and added again. Any Idea what is happening?
docker compose file:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
networks:
zookeeper1:
zookeeper2:
zookeeper3:
kafka-1:
container_name: kafka-1
hostname: kafka-1
restart: always
image: "confluentinc/cp-kafka:7.3.2"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9093
KAFKA_BROKER_ID: 1
KAFKA_JMX_HOSTNAME: kafka-1
KAFKA_JMX_PORT: 50400
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
KAFKA_LOG_CLEANUP_POLICY: delete
KAFKA_LOG_RETENTION_MINUTES: 21600
KAFKA_NUM_PARTITIONS: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
networks:
node12:
node13:
zookeeper1:
kafka-2:
container_name: kafka-2
hostname: kafka-2
restart: always
image: "confluentinc/cp-kafka:7.3.2"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9093
KAFKA_BROKER_ID: 2
KAFKA_JMX_HOSTNAME: kafka-2
KAFKA_JMX_PORT: 50400
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
KAFKA_LOG_CLEANUP_POLICY: delete
KAFKA_LOG_RETENTION_MINUTES: 21600
KAFKA_NUM_PARTITIONS: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
networks:
node23:
node12:
zookeeper2:
kafka-3:
container_name: kafka-3
hostname: kafka-3
restart: always
image: "confluentinc/cp-kafka:7.3.2"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9093
KAFKA_BROKER_ID: 3
KAFKA_JMX_HOSTNAME: kafka-3
KAFKA_JMX_PORT: 50400
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
KAFKA_LOG_CLEANUP_POLICY: delete
KAFKA_LOG_RETENTION_MINUTES: 21600
KAFKA_NUM_PARTITIONS: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
networks:
node13:
node23:
zookeeper3:
networks:
node12:
node23:
node13:
zookeeper1:
zookeeper2:
zookeeper3:
node 3 is constantly generating this log:
kafka-3 | [2025-04-03 19:19:06,174] WARN [RequestSendThread controllerId=3] Controller 3's connection to broker kafka-1:9093 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
kafka-3 | java.io.IOException: Connection to kafka-1:9093 (id: 1 rack: null) failed.
kafka-3 | at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
kafka-3 | at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
kafka-3 | at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
kafka-3 | at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
kafka-3 | [2025-04-03 19:19:06,274] WARN [Controller id=3, targetBrokerId=1] Error connecting to node kafka-1:9093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
kafka-3 | java.net.UnknownHostException: kafka-1
kafka-3 | at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
kafka-3 | at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
kafka-3 | at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
kafka-3 | at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
kafka-3 | at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
kafka-3 | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
kafka-3 | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
kafka-3 | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
kafka-3 | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
kafka-3 | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
kafka-3 | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
kafka-3 | at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)
kafka-3 | at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
kafka-3 | at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
kafka-3 | at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
kafka-3 | [2025-04-03 19:19:06,274] WARN [RequestSendThread controllerId=3] Controller 3's connection to broker kafka-1:9093 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
kafka-3 | java.io.IOException: Connection to kafka-1:9093 (id: 1 rack: null) failed.
kafka-3 | at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
kafka-3 | at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
kafka-3 | at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
kafka-3 | at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
kafka-3 | [2025-04-03 19:19:06,374] WARN [Controller id=3, targetBrokerId=1] Error connecting to node kafka-1:9093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
kafka-3 | java.net.UnknownHostException: kafka-1
kafka-3 | at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
kafka-3 | at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
kafka-3 | at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
kafka-3 | at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
kafka-3 | at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
kafka-3 | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
kafka-3 | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
kafka-3 | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
kafka-3 | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
kafka-3 | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
kafka-3 | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
kafka-3 | at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)
kafka-3 | at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
kafka-3 | at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
kafka-3 | at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
kafka-3 | [2025-04-03 19:19:06,375] WARN [RequestSendThread controllerId=3] Controller 3's connection to broker kafka-1:9093 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
and node 1 is generating this log:
kafka-1 | [2025-04-03 19:20:02,587] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error connecting to node kafka-3:9093 (id: 3 rack: null) (org.apache.kafka.clients.NetworkClient)
kafka-1 | java.net.UnknownHostException: kafka-3
kafka-1 | at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
kafka-1 | at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
kafka-1 | at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
kafka-1 | at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
kafka-1 | at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
kafka-1 | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
kafka-1 | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
kafka-1 | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
kafka-1 | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
kafka-1 | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
kafka-1 | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
kafka-1 | at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)
kafka-1 | at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
kafka-1 | at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
kafka-1 | at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
kafka-1 | at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
kafka-1 | at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
kafka-1 | at scala.Option.foreach(Option.scala:437)
kafka-1 | at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
kafka-1 | at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
kafka-1 | at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
kafka-1 | [2025-04-03 19:20:02,587] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={foo-1-0=PartitionData(topicId=Vcabd4gSSHy0KuSbI-F60w, fetchOffset=4, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=214584047, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
kafka-1 | java.io.IOException: Connection to kafka-3:9093 (id: 3 rack: null) failed.
kafka-1 | at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
kafka-1 | at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
kafka-1 | at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
kafka-1 | at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
kafka-1 | at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
kafka-1 | at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
kafka-1 | at scala.Option.foreach(Option.scala:437)
kafka-1 | at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
kafka-1 | at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
kafka-1 | at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)