I’m using a Kafka Source in Spark Streaming to receive records generated using Datagen in Confluent Cloud. I intend to use Confluent Schema Registry, the schema registry of confluent cloud requires to pass some authentication data that I don’t know how to enter them:
I think I have to pass this authentication data to CachedSchemaRegistryClient but I’m not sure if so and how.
> // Setup the Avro deserialization UDF > schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128) > kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) > spark.udf.register("deserialize", (bytes: Array[Byte]) => > kafkaAvroDeserializer.deserialize(bytes)
If I am trying to send authentication to schema registry as
> val restService = new RestService(schemaRegistryURL) > > val props = Map( > "basic.auth.credentials.source" -> "USER_INFO", > "schema.registry.basic.auth.user.info" -> "secret:secret" > ).asJava > > var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
Cannot resolve overloaded constructor CachedSchemaRegistryClient , seems that only 2 parameters are to be sent to CachedSchemaRegistryClient.
Currently, this is the exception I am facing :
Exception in thread “main” io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
How do I fix this?
Any suggestions/sample code would really be great! Thanks in advance.