Integrating Spark Structured Streaming with the Confluent Schema Registry

I’m using a Kafka Source in Spark Streaming to receive records generated using Datagen in Confluent Cloud. I intend to use Confluent Schema Registry, the schema registry of confluent cloud requires to pass some authentication data that I don’t know how to enter them:

basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret

I think I have to pass this authentication data to CachedSchemaRegistryClient but I’m not sure if so and how.

> // Setup the Avro deserialization UDF
>    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
>     kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
>     spark.udf.register("deserialize", (bytes: Array[Byte]) =>
>       kafkaAvroDeserializer.deserialize(bytes)

If I am trying to send authentication to schema registry as

> val restService = new RestService(schemaRegistryURL)
> 
>   val props = Map(
>     "basic.auth.credentials.source" -> "USER_INFO",
>     "schema.registry.basic.auth.user.info" -> "secret:secret"
>   ).asJava
> 
>   var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)

I get Cannot resolve overloaded constructor CachedSchemaRegistryClient , seems that only 2 parameters are to be sent to CachedSchemaRegistryClient.

Currently, this is the exception I am facing :

Exception in thread “main” io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

How do I fix this?

Any suggestions/sample code would really be great! Thanks in advance.