Integrating Spark Structured Streaming with the Confluent Schema Registry

I’m using a Kafka Source in Spark Streaming to receive records generated using Datagen in Confluent Cloud. I intend to use Confluent Schema Registry, the schema registry of confluent cloud requires to pass some authentication data that I don’t know how to enter them:

basic.auth.credentials.source=USER_INFO secret

I think I have to pass this authentication data to CachedSchemaRegistryClient but I’m not sure if so and how.

> // Setup the Avro deserialization UDF
>    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
>     kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
>     spark.udf.register("deserialize", (bytes: Array[Byte]) =>
>       kafkaAvroDeserializer.deserialize(bytes)

If I am trying to send authentication to schema registry as

> val restService = new RestService(schemaRegistryURL)
>   val props = Map(
>     "basic.auth.credentials.source" -> "USER_INFO",
>     "" -> "secret:secret"
>   ).asJava
>   var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)

I get Cannot resolve overloaded constructor CachedSchemaRegistryClient , seems that only 2 parameters are to be sent to CachedSchemaRegistryClient.

Currently, this is the exception I am facing :

Exception in thread “main” Unauthorized; error code: 401

How do I fix this?

Any suggestions/sample code would really be great! Thanks in advance.