Kafka protobuf console consumer SSL connection and configuration

I need to use kafka-protobuf-console-consumer to subscribe to a topic and get data in json format from a publisher which only supports SSL connection. Issue i face is :

  1. I am not able to find consumer configuration that would be applicable for kafka-protobuf-console-consumer.
  2. While i can define and export a KAFKA_OPTS and Debug the “kafka-console-consumer” , trying to kick off the “kafka-protobuf-console-consumer”, debug does not seem to be working.
  3. Inside file pointed by --consumer.config i am declaring variables for SSL connection, something like below but appropriate to my setup.
    bootstrap.servers=kafka1:9093
    security.protocol=SSL
    ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
  4. Should i pass schema.registry.url in command line or should that be also put in file pointed by --consumer.config .
    5.Also should schema.registry.url point to just server and port like --property schema.registry.url=http://myserver.com:8888, how will it determine schema if there are multiple subjects ? Will it be based on topic to which we are connecting ?
  5. If schema.registry.url also support SSL then what are all the other parameters that i need to add in my file pointed by --consumer.config
  6. i am trying to run command like kafka-protobuf-console-consumer --bootstrap-server myserver.com:8888 --topic mytopic --property schema.registry.url=http://myserver.com:8888 --group mygroup_name . Does anyone see an issue with this ?
    Can someone here help me with how can a successfully run a kafka protobuf console consumer using SSL connection ?

@adonis I can share with you an example command that works for me, however, I’m using Confluent Cloud which utilizies the SASL_SSL settings. So your setup may require some adjustments. For reasons I cannot explain, some of the properties for the schema registry do not seem to be read from the configuration file, instead they need to be provided as command arguments.

kafka-protobuf-console-consumer --bootstrap-server \ 
cluster.region.cloud.confluent.cloud:9092 \
--property schema.registry.url=https://sr.region.cloud.confluent.cloud \
--property basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info=key:secret \
--consumer.config path-to-config-file --topic my-topic --from-beginning

And my configuration file looks something like this:

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=cluster.region.cloud.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='key'   password='secret';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://cluster.region.cloud.confluent.cloud
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=key:secret

As far as debugging, you could try setting the following (but maybe you’ve already tried this):

export KAFKA_OPTS=-Djavax.net.debug=all
1 Like

Thanks @rick , that was very helpful. Appreciate your help. I was able to run the command too, that i have provided in my Query. In my case problem was some parameter setting in below , when i corrected the value for below parameter things worked fine for me.

security.protocol=SSL ssl.truststore.location=/opt/kafka/config/kafka.truststore.jks ssl.truststore.password= *trustore_password* ssl.keystore.location=/opt/kafka/config/client.keystore.jks ssl.keystore.password= *keystore_password* ssl.key.password= *key_password* ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.client.auth=required

However i see kafka-protobuf-console-consumer also has parameter --partitions . I see that i have 3 partitions each broker. So if i do not provide --partitions options will this get me data for all 3 partitions or just any one of them ?

You only need the --partition argument if your use case is to read only from a specific partition. If you just want to consume from the topic generally it is not required and you will receive data from all partitions.

1 Like

HI @rick ,
Actually we decided to go with group-id instead of partition. But i had few more things to understand.
I tried to provide the parameter
schema.registry.url=“http://localhost:8081,http://abcd.com:8081”. This resulted into an exception while using kafka-protobuf-console-consumer with message Error deserializing Protobuf message .... For input string "8081,http" . It seems to be considering this list ( comma separated ) as a single text.
2) Also is there a feasibility of recovering and storing last message sequence processed by a particular group id using console consumer. So that if there is a disconnect or an issue i can retrieve data from same sequence ? I see we have --offset which tells how much back we want to go, but probably that is something not fulfilling my requirement.

Thanks