I’m using a Kafka Source in Spark Streaming to receive records generated using Datagen in Confluent Cloud. I intend to use Confluent Schema Registry, the schema registry of confluent cloud requires to pass some authentication data that I don’t know how to enter them:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret
I think I have to pass this authentication data to CachedSchemaRegistryClient but I’m not sure if so and how.
> // Setup the Avro deserialization UDF
> schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
> kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
> spark.udf.register("deserialize", (bytes: Array[Byte]) =>
> kafkaAvroDeserializer.deserialize(bytes)
If I am trying to send authentication to schema registry as
> val restService = new RestService(schemaRegistryURL)
>
> val props = Map(
> "basic.auth.credentials.source" -> "USER_INFO",
> "schema.registry.basic.auth.user.info" -> "secret:secret"
> ).asJava
>
> var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
I get Cannot resolve overloaded constructor
CachedSchemaRegistryClient , seems that only 2 parameters are to be sent to CachedSchemaRegistryClient.
Currently, this is the exception I am facing :
Exception in thread “main” io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
How do I fix this?
Any suggestions/sample code would really be great! Thanks in advance.