I am very new in Kafka, and I have been struggling to make my schema registry work.
I am spinning my kafka (single node) and my schema registry up with docker.
My docker yaml file looks as following
…
Kafka single mode
kafka:
image: bitnami/kafka:latest
restart: “no”
links:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_HOSTNAME: localhost
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
Kafka schema registry
kafka-schema-registry:
image: bitnami/schema-registry
hostname : localhost
restart: always
ports:
- 8081:8081
depends_on:
- zookeeper
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: “zookeeper:2181”
SCHEMA_REGISTRY_HOST_NAME: localhost
SCHEMA_REGISTRY_LISTENERS: localhost:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: ‘*’
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1
SCHEMA_REGISTRY_DEBUG: “true”
I am not sure whether my schema-registry is ‘linked’ with kafka, or how else I can pair them.
I would really appreciate the help. Googling the ‘Failed to get Kafka cluster ID’ error did not help.
Can someone please help me?
The complete error:
2024-01-27 19:07:27 [2024-01-27 18:07:27,132] INFO Adding listener with HTTP/2: NamedURI{uri=http://localhost:8081, name=‘null’} (io.confluent.rest.ApplicationServer:296)
2024-01-27 19:07:28 [2024-01-27 18:07:28,117] INFO Found internal listener: NamedURI{uri=http://localhost:8081, name=‘null’} (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:189)
2024-01-27 19:07:28 [2024-01-27 18:07:28,122] INFO Setting my identity to version=1,host=127.0.0.1,port=8081,scheme=http,leaderEligibility=true,isLeader=false (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:192)
2024-01-27 19:08:28 [2024-01-27 18:08:28,609] ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:79)
2024-01-27 19:08:28 io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException: Failed to get Kafka cluster ID
2024-01-27 19:08:28 at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.kafkaClusterId(KafkaSchemaRegistry.java:1982)
2024-01-27 19:08:28 at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:209)
2024-01-27 19:08:28 at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:73)
2024-01-27 19:08:28 at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.configureBaseApplication(SchemaRegistryRestApplication.java:101)
2024-01-27 19:08:28 at io.confluent.rest.Application.configureHandler(Application.java:299)
2024-01-27 19:08:28 at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:201)
2024-01-27 19:08:28 at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
2024-01-27 19:08:28 at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:44)
2024-01-27 19:08:28 Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2024-01-27 19:08:28 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
2024-01-27 19:08:28 at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
2024-01-27 19:08:28 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
2024-01-27 19:08:28 at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.kafkaClusterId(KafkaSchemaRegistry.java:1980)
2024-01-27 19:08:28 … 7 more
2024-01-27 19:08:28 Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2024-01-27 19:08:29
2024-01-27 19:04:22 schema-registry 18:04:22.14 INFO ==>
2024-01-27 19:04:22 schema-registry 18:04:22.20 INFO ==> Welcome to the Bitnami schema-registry container
2024-01-27 19:04:22 schema-registry 18:04:22.21 INFO ==> Subscribe to project updates by watching
2024-01-27 19:04:22 schema-registry 18:04:22.22 INFO ==> Submit issues and feature requests at
Possibly this is due to Kafka not being available? Anything in the kafka
container logs? AIUI bitnami’s Kafka image gets configured via variables prefixed with KAFKA_CFG_
, e.g., see this example, so it doesn’t look properly configured.
If you’re unable to get it working, the following Kafka + Schema Registry docker-compose.yml
based on Confluent’s Docker images should work for you:
---
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.5.3
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.5.3
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
1 Like
Thanks for the example. I used the example in the link and seems like both my kafka nodes (kafka-0 and kafka-1) and the schema-registry are working.
Although my consumer now is unable to connect to my kafka:
This is my docker-compose file:
version: "2"
services:
# Zookeper single mode
zookeeper:
image: zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka-0:
image: docker.io/bitnami/kafka:3.6
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
volumes:
- 'kafka0_data:/bitnami/kafka'
kafka-1:
image: docker.io/bitnami/kafka:3.6
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
volumes:
- 'kafka1_data:/bitnami/kafka'
schema-registry:
image: docker.io/bitnami/schema-registry:7.5
ports:
- '8081:8081'
depends_on:
- kafka-0
- kafka-1
- zookeeper
environment:
- SCHEMA_REGISTRY_LISTENERS="http://0.0.0.0:8081
- SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka-0:9092,PLAINTEXT://kafka-1:9092
# Postgres SQL database
postgres:
image: debezium/example-postgres:latest
container_name: postgres
ports:
- 5433:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
# Debezium connector
debezium:
image: debezium/connect:latest
ports:
- 8083:8083
environment:
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
BOOTSTRAP_SERVERS: kafka-0:9092
links:
- zookeeper
- postgres
depends_on:
- kafka-0
- kafka-1
- zookeeper
- postgres
# Confluent control center GUI
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
depends_on:
- kafka-0
- kafka-1
- zookeeper
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka-0:9092'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://localhost:8081"
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MODE_ENABLE: "management"
PORT: 9021
volumes:
kafka0_data:
driver: local
kafka1_data:
driver: local
and this is my consumer c# code:
// Kafka consumer configuration
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "general.data.protection.consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
var schemaRegistryUrl = ""http://localhost:8081";
const string topic = "MyTopicName";
Am I using the correct BootstrapServers address and the correct schema registry url?
I am getting this error:
%3|1706648595.821|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Unknown error (after 2052ms in state CONNECT)
Error: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Unknown error (after 2052ms in state CONNECT)
Error: 1/1 brokers are down