Unable to specify the SASL configuration for the schema history

We have an instance of Kafka connect running on a self-hosted node which connects to AWS MSK using SASL_SSL mechanism. I am running into an issue where the SQL server connector is running out of memory with the underlying error as what seems to be an Auth issue for the internal schemahistory connector because in another environment where Kafka is deployed with no auth this works fine.

We have added the below settings to connect-distributed.properties but still I don’t see these actually being picked and config values in the log still shows as PLAINTEXT. The only way for the connector to work is to pass these values in the connector create API post which the connector works fine and prints out correct values in the consumer and producer config and is able to establish the schema history connector.
The auth seems to be successful for the topic for which the request comes (verified in the config logs that sasl related properties are set properly) but fails for the internal schema connector.

# existing config
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=SCRAM-SHA-512
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';
ssl.truststore.type=PEM
producer.ssl.truststore.type=PEM
ssl.truststore.location=/etc/pki/tls/certs/ca-bundle.crt
producer.ssl.truststore.location=/etc/pki/tls/certs/ca-bundle.crt

# Newly added config
schema.history.internal.producer.security.protocol=SASL_SSL
schema.history.internal.producer.sasl.mechanism=SCRAM-SHA-512
schema.history.internal.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';
schema.history.internal.consumer.security.protocol=SASL_SSL
schema.history.internal.consumer.sasl.mechanism=SCRAM-SHA-512
schema.history.internal.consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';

Error log

INFO [Producer clientId=xxxxx-config-schemahistory] Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:977)
INFO [Producer clientId=xxxxx-config-schemahistory] Cancelled in-flight API_VERSIONS request with correlation id 0 due to node -2 being disconnected (elapsed time since creation: 296ms, elapsed time since send: 296ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:344)
WARN [Producer clientId=xxxxx-config-schemahistory] Bootstrap broker bbbb.kafka.us-east-1.amazonaws.com:9096 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1105)
INFO App info kafka.consumer for xxxxx-config-schemahistory unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
ERROR WorkerSourceTask{id=xxxxx-config-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)

Along with this it prints out the consumerconfig and ProducerConfig

[2024-02-27 06:22:15,095] INFO ProducerConfig values: 
	acks = 1
	auto.include.jmx.reporter = true
	batch.size = 32768
	bootstrap.servers = [aaaa.kafka.us-east-1.amazonaws.com:9096, bbbb.kafka.us-east-1.amazonaws.com:9096, cccc.kafka.us-east-1.amazonaws.com:9096]
	buffer.memory = 1048576
	client.dns.lookup = use_all_dns_ips
	client.id = xxxxx-config-schemahistory
	***removed***
	sasl.jaas.config = null
	***removed***
	sasl.mechanism = GSSAPI
	***removed***
	security.protocol = PLAINTEXT
	security.providers = null
        ***removed***
 (org.apache.kafka.clients.producer.ProducerConfig:370)
[2024-02-27 06:22:15,100] INFO ConsumerConfig values: 
	***removed***
	bootstrap.servers = [aaaa.kafka.us-east-1.amazonaws.com:9096, bbbb.kafka.us-east-1.amazonaws.com:9096, cccc.kafka.us-east-1.amazonaws.com:9096]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = xxxxx-config-schemahistory
	***removed***
	fetch.min.bytes = 1
	group.id = xxxxx-config-schemahistory
	***removed***
	sasl.jaas.config = null
        ***removed***
	sasl.mechanism = GSSAPI
        ***removed***
	security.protocol = PLAINTEXT
	security.providers = null
        ***removed***
 (org.apache.kafka.clients.consumer.ConsumerConfig:370)

TLDR

  1. Schema history connector does not work with SASL Kafka managed instance.
  2. On passing the config in API everything works.
  3. Does not work when settings the same values in connect-distributed.properties
  4. Is there no config file where these settings can be added, the apprehension towards adding in the create call is to avoid persisting the Kafka credential in the client.
  5. The connection to the topic is done fine only issue is with internal connector which does not pick the right auth related config.

What version of the connector are you using? Possibly 1.9 or earlier? For Debezium 2+ that config looks correct, but for 1.9 and earlier the properties should begin with database.history.consumer. and database.history.producer. (see here).

We are using 2.3.0.final SQL connector.
Is there something else I can check on this to figure out why the right config is not used?

One thing to investigate is to look for this logging that starts with KafkaSchemaHistory Consumer config: in v2.3 rather than the logging in the original post (INFO ConsumerConfig values:). Docs here for setting INFO level logging on io.debezium.relational.history. That logging might shed light, e.g., perhaps will rule out client config as a red herring.

The Debezium JIRA is another channel where you might want to ask if you hit a wall.

I do have the INFO KafkaSchemaHistory log lines printed, I am posting both the variant where it worked via API and does not work via the config file.

Incorrect config being picked

INFO KafkaSchemaHistory Consumer config: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, enable.auto.commit=false, group.id=xxxxx-config-schemahistory, bootstrap.servers=aaaa.kafka.us-east-1.amazonaws.com:9096,bbbb.kafka.us-east-1.amazonaws.com:9096,cccc.kafka.us-east-1.amazonaws.com:9096, fetch.min.bytes=1, session.timeout.ms=10000, auto.offset.reset=earliest, client.id=xxxxx-config-schemahistory} (io.debezium.storage.kafka.history.KafkaSchemaHistory:245)
INFO KafkaSchemaHistory Producer config: {retries=1, value.serializer=org.apache.kafka.common.serialization.StringSerializer, acks=1, batch.size=32768, max.block.ms=10000, bootstrap.servers=aaaa.kafka.us-east-1.amazonaws.com:9096,bbbb.kafka.us-east-1.amazonaws.com:9096,cccc.kafka.us-east-1.amazonaws.com:9096, buffer.memory=1048576, key.serializer=org.apache.kafka.common.serialization.StringSerializer, client.id=xxxxx-config-schemahistory, linger.ms=0} (io.debezium.storage.kafka.history.KafkaSchemaHistory:246)

Working with API logs

INFO KafkaSchemaHistory Consumer config: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, group.id=xxxxx-config-schemahistory, bootstrap.servers=aaaa.kafka.us-east-1.amazonaws.com:9096,bbbb.kafka.us-east-1.amazonaws.com:9096,cccc.kafka.us-east-1.amazonaws.com:9096, security.protocol=SASL_SSL, enable.auto.commit=false, sasl.mechanism=SCRAM-SHA-512, sasl.jaas.config=********, fetch.min.bytes=1, session.timeout.ms=10000, auto.offset.reset=earliest, client.id=xxxxx-config-schemahistory} (io.debezium.storage.kafka.history.KafkaSchemaHistory:245)
INFO KafkaSchemaHistory Producer config: {batch.size=32768, acks=1, bootstrap.servers=aaaa.kafka.us-east-1.amazonaws.com:9096,bbbb.kafka.us-east-1.amazonaws.com:9096,cccc.kafka.us-east-1.amazonaws.com:9096, buffer.memory=1048576, key.serializer=org.apache.kafka.common.serialization.StringSerializer, security.protocol=SASL_SSL, retries=1, value.serializer=org.apache.kafka.common.serialization.StringSerializer, sasl.mechanism=SCRAM-SHA-512, sasl.jaas.config=********, max.block.ms=10000, client.id=xxxxx-config-schemahistory, linger.ms=0} (io.debezium.storage.kafka.history.KafkaSchemaHistory:246)

Sorry, I’m rereading this thread and see that I completely misunderstood the original question and went down the wrong path.

What you attempted to do with Debezium connector-specific config in the worker properties isn’t supported by Connect. The connector properties that inherit from the worker properties are a specific set of allow listed properties, i.e., the worker doesn’t pass all properties through.

To solve this part of the question:

avoid persisting the Kafka credential in the client

I’d recommend checking out the Apache 2.0 licensed secret providers here (the first 4). E.g., with the Vault provider, the worker config file has the Vault token or it uses the VAULT_TOKEN environment variable, and then the connector config just has a pointer to the secret in Vault.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.