Hi
Brokers are up and running
in kafkaServer.out it is showing below warning and error
[4:57 PM] Innamuri Srinivasulu
[2024-08-02 16:55:47,711] WARN Cannot compute broker load because of NaN expected total service time (kafka.metrics.BrokerLoad)
[2024-08-02 16:55:47,711] WARN Failed to produce 621 metrics messages (io.confluent.telemetry.exporter.kafka.KafkaExporter)
org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for _confluent-telemetry-metrics-3:120000 ms has passed since batch creation
seems there’s something wrong in the connection to the broker
does a telnet from SR host to the broker work?
how does you overall config look like?
best,
michael
Here i am sharing properties file check once
Zookeeper
the directory where the snapshot is stored.
dataDir=/mnt/appln/vmuser/zookeeper_snapshots
the port at which the clients will connect
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.17.206.108:2888:3888
server.2=172.17.198.172:2888:3888
server.3=172.17.194.170:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=60
Disable the adminserver by default to avoid port conflicts.
Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
admin.serverPort=8080
Enable audit logging. See log4j.properties for audit logger configuration
audit.enable=true
server.properties
This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
See kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
#broker.id.generation.enable=true
############################# Socket Server Settings #############################
The address the socket server listens on. If not configured, the host name will be equal to the value of
java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
FORMAT:
listeners = listener_name://host_name:port
EXAMPLE:
listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://172.17.194.170:9092
Listener name, hostname and port the broker will advertise to clients.
If not set, it uses the value for “listeners”.
advertised.listeners=PLAINTEXT://172.17.194.170:9092
#confluent.http.server.listeners=http://0.0.0.0:8090
Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
A comma separated list of directories under which to store log files
log.dirs=/mnt/appln/vmuser/kafka_logs
The default number of log partitions per topic. More partitions allow greater
parallelism for consumption, but this will also result in more files across
the brokers.
num.partitions=3
The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
The replication factor for the group metadata internal topics “__consumer_offsets” and “__transaction_state”
For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Log Flush Policy #############################
Messages are immediately written to the filesystem but by default we only fsync() to sync
the OS cache lazily. The following configurations control the flush of data to disk.
There are a few important trade-offs here:
1. Durability: Unflushed data may be lost if you are not using replication.
2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
The settings below allow one to configure the flush policy to flush data after a period of time or
every N messages (or both). This can be done globally and overridden on a per-topic basis.
The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
The following configurations control the disposal of log segments. The policy can
be set to delete segments after a period of time, or after a given size has accumulated.
A segment will be deleted whenever either of these criteria are met. Deletion always happens
from the end of the log.
The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
A size-based retention policy for logs. Segments are pruned from the log unless the remaining
segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
The interval at which log segments are checked to see if they can be deleted according
to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
Zookeeper connection string (see zookeeper docs for details).
This is a comma separated host:port pairs, each corresponding to a zk
server. e.g. “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”.
You can also append an optional chroot string to the urls to specify the
root directory for all kafka znodes.
zookeeper.connect=172.17.206.10:2181,172.17.194.170:2181,172.17.198.172:2181
Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
##################### Confluent Metrics Reporter #######################
Confluent Control Center and Confluent Auto Data Balancer integration
Uncomment the following lines to publish monitoring data for
Confluent Control Center and Confluent Auto Data Balancer
If you are using a dedicated metrics cluster, also adjust the settings
to point to your metrics kakfa cluster.
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=172.17.205.18:9092,172.17.194.178:9092,172.17.194.177:9092
Uncomment the following line if the metrics cluster has a single broker
confluent.metrics.reporter.topic.replicas=3
############################# Group Coordinator Settings #############################
The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
The default value for this is 3 seconds.
We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=3
############################# Confluent Authorizer Settings #############################
Uncomment to enable Confluent Authorizer with support for ACLs, LDAP groups and RBAC
#authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
Semi-colon separated list of super users in the format :
#super.users=
Specify a valid Confluent license. By default free-tier license will be used
confluent.license=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJjb250cm9sLWNlbnRlciIsIm5iNCI6IjE3MTE2NjEzMTAiLCJtb25pdG9yaW5nIjp0cnVlLCJsaWNlbnNlVHlwZSI6IkVudGVycHJpc2UiLCJpc3MiOiJDb25mbHVlbnQiLCJpYXQiOjE3MTE2MDkyMDAsImV4cCI6MTc0NTczNzIwMCwiYXVkIjoiMDA2VXowMDAwMDQzRk1uSUFNIn0=.RJGkxROydFvV7PNZmDW7sBKrYPh_6e5UMvx4BWO7LyEqoS4q_hKH4QtuaWILQHEBqnyEiGz3GCSgNTfauS1EjX-fzGFqkw25nBj6s6siueEL0E7nQtlwCtVVa6V8Gd3GQVc_otfyEBS3dBMPdACdNuSIlrbK092X4VjmDQs6wRluY1AFaX0cE8XtDkkaBBm8TrXoCvCNDMGXFSZ3XwUvphYEJ4IKcXoe5Czmqy1CE8C41F038YMGPD0iC-7xACXNFF0-uS9IIwYhEUS7KbaSdEs2PYh1LYPXmWj2Dn6Ij1_Iv6i6QLgtQJf_9051SeNAo8oTGAkpU5VTE3_Sr4H_SA
Replication factor for the topic used for licensing. Default is 3.
confluent.license.topic.replication.factor=3
Uncomment the following lines and specify values where required to enable CONFLUENT provider for RBAC and centralized ACLs
Enable CONFLUENT provider
#confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
Bootstrap servers for RBAC metadata. Must be provided if this broker is not in the metadata cluster
confluent.metadata.bootstrap.servers=172.17.206.10:2181,172.17.194.170:2181,172.17.198.172:2181
Replication factor for the metadata topic used for authorization. Default is 3.
confluent.metadata.topic.replication.factor=3
Replication factor for the topic used for audit logs. Default is 3.
confluent.security.event.logger.exporter.kafka.topic.replicas=1
Listeners for metadata server
#confluent.metadata.server.listeners=http://0.0.0.0:8090
Advertised listeners for metadata server
#confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090
############################# Confluent Data Balancer Settings #############################
The Confluent Data Balancer is used to measure the load across the Kafka cluster and move data
around as necessary. Comment out this line to disable the Data Balancer.
confluent.balancer.enable=true
By default, the Data Balancer will only move data when an empty broker (one with no partitions on it)
is added to the cluster or a broker failure is detected. Comment out this line to allow the Data
Balancer to balance load across the cluster whenever an imbalance is detected.
#confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD
The default time to declare a broker permanently failed is 1 hour (3600000 ms).
Uncomment this line to turn off broker failure detection, or adjust the threshold
to change the duration before a broker is declared failed.
#confluent.balancer.heal.broker.failure.threshold.ms=-1
Edit and uncomment the following line to limit the network bandwidth used by data balancing operations.
This value is in bytes/sec/broker. The default is 10MB/sec.
#confluent.balancer.throttle.bytes.per.second=10485760
Capacity Limits – when set to positive values, the Data Balancer will attempt to keep
resource usage per-broker below these limits.
Edit and uncomment this line to limit the maximum number of replicas per broker. Default is unlimited.
#confluent.balancer.max.replicas=10000
Edit and uncomment this line to limit what fraction of the log disk (0-1.0) is used before rebalancing.
The default (below) is 85% of the log disk.
#confluent.balancer.disk.max.load=0.85
Edit and uncomment these lines to define a maximum network capacity per broker, in bytes per
second. The Data Balancer will attempt to ensure that brokers are using less than this amount
of network bandwidth when rebalancing.
Here, 10MB/s. The default is unlimited capacity.
#confluent.balancer.network.in.max.bytes.per.second=10485760
#confluent.balancer.network.out.max.bytes.per.second=10485760
Edit and uncomment this line to identify specific topics that should not be moved by the data balancer.
Removal operations always move topics regardless of this setting.
#confluent.balancer.exclude.topic.names=
Edit and uncomment this line to identify topic prefixes that should not be moved by the data balancer.
(For example, a “confluent.balancer” prefix will match all of “confluent.balancer.a”, “confluent.balancer.b”,
“confluent.balancer.c”, and so on.)
Removal operations always move topics regardless of this setting.
#confluent.balancer.exclude.topic.prefixes=
The replication factor for the topics the Data Balancer uses to store internal state.
For anything other than development testing, a value greater than 1 is recommended to ensure availability.
The default value is 3.
confluent.balancer.topic.replication.factor=3
################################## Confluent Telemetry Settings ##################################
To start using Telemetry, first generate a Confluent Cloud API key/secret. This can be done with
instructions at https://docs.confluent.io/current/cloud/using/api-keys.html. Note that you should
be using the ‘–resource cloud’ flag.
After generating an API key/secret, to enable Telemetry uncomment the lines below and paste
in your API key/secret.
#confluent.telemetry.enabled=true
#confluent.telemetry.api.key=<CLOUD_API_KEY>
#confluent.telemetry.api.secret=<CCLOUD_API_SECRET>
min.insync.replicas=2
#unclean.leader.election.enable=true
metadata.max.age.ms=300000
auto.create.topics.enable=false
Schemeregistry properties
Copyright 2018 Confluent Inc.
Licensed under the Apache License, Version 2.0 (the “License”);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Apache License, Version 2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
The address the socket server listens on.
FORMAT:
listeners = listener_name://host_name:port
EXAMPLE:
listeners = PLAINTEXT://your.host.name:9092
listeners=http://172.17.194.170:8081
Use this setting to specify the bootstrap servers for your Kafka cluster and it
will be used both for selecting the leader schema registry instance and for storing the data for
registered schemas.
kafkastore.bootstrap.servers=172.17.206.10:2181,172.17.194.170:2181,172.17.198.172:2181
#kafkastore.connection.url=172.17.194.178:2181
#schema.registry.group.id=schema-registry
The name of the topic to store schemas in
kafkastore.topic=_schemas
If true, API requests that fail will include extra debugging information, including stack traces
debug=false
#metadata.encoder.secret=REPLACE_ME_WITH_HIGH_ENTROPY_STRING
resource.extension.class=io.confluent.dekregistry.DekRegistryResourceExtension
#master.eligibility=false
seems you try to connect to zookeeper instead of the brokers
After changing kafka.bootsrap.servers below error coming
[11:38] Innamuri Srinivasulu
.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:192)
[2024-08-06 11:34:48,627] ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:79)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException: Failed to get Kafka cluster ID
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.kafkaClusterId(KafkaSchemaRegistry.java:1982)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.<init>(KafkaSchemaRegistry.java:209)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:73)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.configureBaseApplication(SchemaRegistryRestApplication.java:103)
at io.confluent.rest.Application.configureHandler(Application.java:324)
at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:212)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:44)
Caused by: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.kafkaClusterId(KafkaSchemaRegistry.java:1980)
brokers running correctly?
what does a telnet with the following gives you:
telnet $brokerip 9092
it is giving connected to ip address and port
for all brokers I assume?
did you try to connect via kafka client tools like kafka-topic.sh?
best
yes but same error coming failed to get kafka cluster id any property file changes required
I guess there is something wrong with the broker
kafka-topic.sh command working from the broker itself?
did you use a confluent demo to setup the stack?
A very helpful and insightful discussion.