Schema registry start issue 3 broker cluster


the above is the error while running schema registry

hey @vasui522

brokers are up and running?

best michael

Hi

Brokers are up and running


getting above error brokers and zookeeper running


getting above error brokers and zookeeper running

in kafkaServer.out it is showing below warning and error

[4:57 PM] Innamuri Srinivasulu
[2024-08-02 16:55:47,711] WARN Cannot compute broker load because of NaN expected total service time (kafka.metrics.BrokerLoad)

[2024-08-02 16:55:47,711] WARN Failed to produce 621 metrics messages (io.confluent.telemetry.exporter.kafka.KafkaExporter)

org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for _confluent-telemetry-metrics-3:120000 ms has passed since batch creation

seems there’s something wrong in the connection to the broker

does a telnet from SR host to the broker work?
how does you overall config look like?

best,
michael

Here i am sharing properties file check once

Zookeeper

the directory where the snapshot is stored.

dataDir=/mnt/appln/vmuser/zookeeper_snapshots

the port at which the clients will connect

clientPort=2181

tickTime=2000
initLimit=10
syncLimit=5

server.1=172.17.206.108:2888:3888
server.2=172.17.198.172:2888:3888
server.3=172.17.194.170:2888:3888

autopurge.snapRetainCount=3
autopurge.purgeInterval=24

disable the per-ip limit on the number of connections since this is a non-production config

maxClientCnxns=60

Disable the adminserver by default to avoid port conflicts.

Set the port to something non-conflicting if choosing to enable this

admin.enableServer=false

admin.serverPort=8080

Enable audit logging. See log4j.properties for audit logger configuration

audit.enable=true

server.properties

This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.

See kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

The id of the broker. This must be set to a unique integer for each broker.

broker.id=2
#broker.id.generation.enable=true

############################# Socket Server Settings #############################

The address the socket server listens on. If not configured, the host name will be equal to the value of

java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.

FORMAT:

listeners = listener_name://host_name:port

EXAMPLE:

listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://172.17.194.170:9092

Listener name, hostname and port the broker will advertise to clients.

If not set, it uses the value for “listeners”.

advertised.listeners=PLAINTEXT://172.17.194.170:9092
#confluent.http.server.listeners=http://0.0.0.0:8090

Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details

#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

The number of threads that the server uses for receiving requests from the network and sending responses to the network

num.network.threads=3

The number of threads that the server uses for processing requests, which may include disk I/O

num.io.threads=8

The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400

The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400

The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

############################# Log Basics #############################

A comma separated list of directories under which to store log files

log.dirs=/mnt/appln/vmuser/kafka_logs

The default number of log partitions per topic. More partitions allow greater

parallelism for consumption, but this will also result in more files across

the brokers.

num.partitions=3

The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################

The replication factor for the group metadata internal topics “__consumer_offsets” and “__transaction_state”

For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Log Flush Policy #############################

Messages are immediately written to the filesystem but by default we only fsync() to sync

the OS cache lazily. The following configurations control the flush of data to disk.

There are a few important trade-offs here:

1. Durability: Unflushed data may be lost if you are not using replication.

2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.

The settings below allow one to configure the flush policy to flush data after a period of time or

every N messages (or both). This can be done globally and overridden on a per-topic basis.

The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000

The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

The following configurations control the disposal of log segments. The policy can

be set to delete segments after a period of time, or after a given size has accumulated.

A segment will be deleted whenever either of these criteria are met. Deletion always happens

from the end of the log.

The minimum age of a log file to be eligible for deletion due to age

log.retention.hours=168

A size-based retention policy for logs. Segments are pruned from the log unless the remaining

segments drop below log.retention.bytes. Functions independently of log.retention.hours.

#log.retention.bytes=1073741824

The maximum size of a log segment file. When this size is reached a new log segment will be created.

#log.segment.bytes=1073741824

The interval at which log segments are checked to see if they can be deleted according

to the retention policies

log.retention.check.interval.ms=300000

############################# Zookeeper #############################

Zookeeper connection string (see zookeeper docs for details).

This is a comma separated host:port pairs, each corresponding to a zk

server. e.g. “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”.

You can also append an optional chroot string to the urls to specify the

root directory for all kafka znodes.

zookeeper.connect=172.17.206.10:2181,172.17.194.170:2181,172.17.198.172:2181

Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=18000

##################### Confluent Metrics Reporter #######################

Confluent Control Center and Confluent Auto Data Balancer integration

Uncomment the following lines to publish monitoring data for

Confluent Control Center and Confluent Auto Data Balancer

If you are using a dedicated metrics cluster, also adjust the settings

to point to your metrics kakfa cluster.

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=172.17.205.18:9092,172.17.194.178:9092,172.17.194.177:9092

Uncomment the following line if the metrics cluster has a single broker

confluent.metrics.reporter.topic.replicas=3

############################# Group Coordinator Settings #############################

The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.

The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.

The default value for this is 3 seconds.

We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.

However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.

group.initial.rebalance.delay.ms=3

############################# Confluent Authorizer Settings #############################

Uncomment to enable Confluent Authorizer with support for ACLs, LDAP groups and RBAC

#authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer

Semi-colon separated list of super users in the format :

#super.users=

Specify a valid Confluent license. By default free-tier license will be used

confluent.license=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJjb250cm9sLWNlbnRlciIsIm5iNCI6IjE3MTE2NjEzMTAiLCJtb25pdG9yaW5nIjp0cnVlLCJsaWNlbnNlVHlwZSI6IkVudGVycHJpc2UiLCJpc3MiOiJDb25mbHVlbnQiLCJpYXQiOjE3MTE2MDkyMDAsImV4cCI6MTc0NTczNzIwMCwiYXVkIjoiMDA2VXowMDAwMDQzRk1uSUFNIn0=.RJGkxROydFvV7PNZmDW7sBKrYPh_6e5UMvx4BWO7LyEqoS4q_hKH4QtuaWILQHEBqnyEiGz3GCSgNTfauS1EjX-fzGFqkw25nBj6s6siueEL0E7nQtlwCtVVa6V8Gd3GQVc_otfyEBS3dBMPdACdNuSIlrbK092X4VjmDQs6wRluY1AFaX0cE8XtDkkaBBm8TrXoCvCNDMGXFSZ3XwUvphYEJ4IKcXoe5Czmqy1CE8C41F038YMGPD0iC-7xACXNFF0-uS9IIwYhEUS7KbaSdEs2PYh1LYPXmWj2Dn6Ij1_Iv6i6QLgtQJf_9051SeNAo8oTGAkpU5VTE3_Sr4H_SA

Replication factor for the topic used for licensing. Default is 3.

confluent.license.topic.replication.factor=3

Uncomment the following lines and specify values where required to enable CONFLUENT provider for RBAC and centralized ACLs

Enable CONFLUENT provider

#confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT

Bootstrap servers for RBAC metadata. Must be provided if this broker is not in the metadata cluster

confluent.metadata.bootstrap.servers=172.17.206.10:2181,172.17.194.170:2181,172.17.198.172:2181

Replication factor for the metadata topic used for authorization. Default is 3.

confluent.metadata.topic.replication.factor=3

Replication factor for the topic used for audit logs. Default is 3.

confluent.security.event.logger.exporter.kafka.topic.replicas=1

Listeners for metadata server

#confluent.metadata.server.listeners=http://0.0.0.0:8090

Advertised listeners for metadata server

#confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090

############################# Confluent Data Balancer Settings #############################

The Confluent Data Balancer is used to measure the load across the Kafka cluster and move data

around as necessary. Comment out this line to disable the Data Balancer.

confluent.balancer.enable=true

By default, the Data Balancer will only move data when an empty broker (one with no partitions on it)

is added to the cluster or a broker failure is detected. Comment out this line to allow the Data

Balancer to balance load across the cluster whenever an imbalance is detected.

#confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD

The default time to declare a broker permanently failed is 1 hour (3600000 ms).

Uncomment this line to turn off broker failure detection, or adjust the threshold

to change the duration before a broker is declared failed.

#confluent.balancer.heal.broker.failure.threshold.ms=-1

Edit and uncomment the following line to limit the network bandwidth used by data balancing operations.

This value is in bytes/sec/broker. The default is 10MB/sec.

#confluent.balancer.throttle.bytes.per.second=10485760

Capacity Limits – when set to positive values, the Data Balancer will attempt to keep

resource usage per-broker below these limits.

Edit and uncomment this line to limit the maximum number of replicas per broker. Default is unlimited.

#confluent.balancer.max.replicas=10000

Edit and uncomment this line to limit what fraction of the log disk (0-1.0) is used before rebalancing.

The default (below) is 85% of the log disk.

#confluent.balancer.disk.max.load=0.85

Edit and uncomment these lines to define a maximum network capacity per broker, in bytes per

second. The Data Balancer will attempt to ensure that brokers are using less than this amount

of network bandwidth when rebalancing.

Here, 10MB/s. The default is unlimited capacity.

#confluent.balancer.network.in.max.bytes.per.second=10485760
#confluent.balancer.network.out.max.bytes.per.second=10485760

Edit and uncomment this line to identify specific topics that should not be moved by the data balancer.

Removal operations always move topics regardless of this setting.

#confluent.balancer.exclude.topic.names=

Edit and uncomment this line to identify topic prefixes that should not be moved by the data balancer.

(For example, a “confluent.balancer” prefix will match all of “confluent.balancer.a”, “confluent.balancer.b”,

“confluent.balancer.c”, and so on.)

Removal operations always move topics regardless of this setting.

#confluent.balancer.exclude.topic.prefixes=

The replication factor for the topics the Data Balancer uses to store internal state.

For anything other than development testing, a value greater than 1 is recommended to ensure availability.

The default value is 3.

confluent.balancer.topic.replication.factor=3

################################## Confluent Telemetry Settings ##################################

To start using Telemetry, first generate a Confluent Cloud API key/secret. This can be done with

instructions at https://docs.confluent.io/current/cloud/using/api-keys.html. Note that you should

be using the ‘–resource cloud’ flag.

After generating an API key/secret, to enable Telemetry uncomment the lines below and paste

in your API key/secret.

#confluent.telemetry.enabled=true
#confluent.telemetry.api.key=<CLOUD_API_KEY>
#confluent.telemetry.api.secret=<CCLOUD_API_SECRET>
min.insync.replicas=2
#unclean.leader.election.enable=true
metadata.max.age.ms=300000
auto.create.topics.enable=false

Schemeregistry properties

Copyright 2018 Confluent Inc.

Licensed under the Apache License, Version 2.0 (the “License”);

you may not use this file except in compliance with the License.

You may obtain a copy of the License at

Apache License, Version 2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an “AS IS” BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

The address the socket server listens on.

FORMAT:

listeners = listener_name://host_name:port

EXAMPLE:

listeners = PLAINTEXT://your.host.name:9092

listeners=http://172.17.194.170:8081

Use this setting to specify the bootstrap servers for your Kafka cluster and it

will be used both for selecting the leader schema registry instance and for storing the data for

registered schemas.

kafkastore.bootstrap.servers=172.17.206.10:2181,172.17.194.170:2181,172.17.198.172:2181
#kafkastore.connection.url=172.17.194.178:2181
#schema.registry.group.id=schema-registry

The name of the topic to store schemas in

kafkastore.topic=_schemas

If true, API requests that fail will include extra debugging information, including stack traces

debug=false

#metadata.encoder.secret=REPLACE_ME_WITH_HIGH_ENTROPY_STRING

resource.extension.class=io.confluent.dekregistry.DekRegistryResourceExtension
#master.eligibility=false

seems you try to connect to zookeeper instead of the brokers

After changing kafka.bootsrap.servers below error coming

[11:38] Innamuri Srinivasulu
.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:192)

[2024-08-06 11:34:48,627] ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:79)

io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException: Failed to get Kafka cluster ID

    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.kafkaClusterId(KafkaSchemaRegistry.java:1982)

    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.<init>(KafkaSchemaRegistry.java:209)

    at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:73)

    at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.configureBaseApplication(SchemaRegistryRestApplication.java:103)

    at io.confluent.rest.Application.configureHandler(Application.java:324)

    at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:212)

    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)

    at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:44)

Caused by: java.util.concurrent.TimeoutException

    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)

    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)

    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)

    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.kafkaClusterId(KafkaSchemaRegistry.java:1980)

brokers running correctly?

what does a telnet with the following gives you:

telnet $brokerip 9092

it is giving connected to ip address and port


now the below error getting

for all brokers I assume?

did you try to connect via kafka client tools like kafka-topic.sh?

best

yes but same error coming failed to get kafka cluster id any property file changes required

I guess there is something wrong with the broker

kafka-topic.sh command working from the broker itself?

did you use a confluent demo to setup the stack?

A very helpful and insightful discussion.