I need to use kafka-protobuf-console-consumer to subscribe to a topic and get data in json format from a publisher which only supports SSL connection. Issue i face is :
I am not able to find consumer configuration that would be applicable for kafka-protobuf-console-consumer.
While i can define and export a KAFKA_OPTS and Debug the “kafka-console-consumer” , trying to kick off the “kafka-protobuf-console-consumer”, debug does not seem to be working.
Inside file pointed by --consumer.config i am declaring variables for SSL connection, something like below but appropriate to my setup.
bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
Should i pass schema.registry.url in command line or should that be also put in file pointed by --consumer.config .
5.Also should schema.registry.url point to just server and port like --property schema.registry.url=http://myserver.com:8888, how will it determine schema if there are multiple subjects ? Will it be based on topic to which we are connecting ?
If schema.registry.url also support SSL then what are all the other parameters that i need to add in my file pointed by --consumer.config
i am trying to run command like kafka-protobuf-console-consumer --bootstrap-server myserver.com:8888 --topic mytopic --property schema.registry.url=http://myserver.com:8888 --group mygroup_name . Does anyone see an issue with this ?
Can someone here help me with how can a successfully run a kafka protobuf console consumer using SSL connection ?
@adonis I can share with you an example command that works for me, however, I’m using Confluent Cloud which utilizies the SASL_SSL settings. So your setup may require some adjustments. For reasons I cannot explain, some of the properties for the schema registry do not seem to be read from the configuration file, instead they need to be provided as command arguments.
And my configuration file looks something like this:
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=cluster.region.cloud.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='key' password='secret';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://cluster.region.cloud.confluent.cloud
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=key:secret
As far as debugging, you could try setting the following (but maybe you’ve already tried this):
Thanks @rick , that was very helpful. Appreciate your help. I was able to run the command too, that i have provided in my Query. In my case problem was some parameter setting in below , when i corrected the value for below parameter things worked fine for me.
However i see kafka-protobuf-console-consumer also has parameter --partitions . I see that i have 3 partitions each broker. So if i do not provide --partitions options will this get me data for all 3 partitions or just any one of them ?
You only need the --partition argument if your use case is to read only from a specific partition. If you just want to consume from the topic generally it is not required and you will receive data from all partitions.
HI @rick ,
Actually we decided to go with group-id instead of partition. But i had few more things to understand.
I tried to provide the parameter
schema.registry.url=“http://localhost:8081,http://abcd.com:8081”. This resulted into an exception while using kafka-protobuf-console-consumer with message Error deserializing Protobuf message .... For input string "8081,http" . It seems to be considering this list ( comma separated ) as a single text.
2) Also is there a feasibility of recovering and storing last message sequence processed by a particular group id using console consumer. So that if there is a disconnect or an issue i can retrieve data from same sequence ? I see we have --offset which tells how much back we want to go, but probably that is something not fulfilling my requirement.