Hello, i’m trying to connect to the schema registry using KafkaIO, but i receive this error:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
props.put("schema.registry.url", "https://schema_registry_url");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "KEY:PASSWORD");
PCollection<KV<String, String>> entries =
pipeline
.apply(
"Read Entries from Confluent Cloud Topic",
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapserver)
.withTopic(TOPIC)
.withConsumerConfigUpdates(props)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(SCHEMA_URL, TOPIC))
.withoutMetadata()
);