Hi all!
I’m having trouble launching KSQLDB on a SASL enabled Kafka cluster without RBACs configured.
-
I’m using a docker-compose file to launch the ksqldb server, authenticating with a user who has been granted all privileges on the cluster.
-
It seems that the ksqldb server starts correctly (at least there aren’t any error messages in the log). It creates the control topics, such as _confluent-ksql-default__command_topic and default_ksql_processing_log. Furthermore, if I read the command topic, I can see that a stream on top of the processing log has been created (this stream is also present if I do a LIST STREAMS; in the CLI).
-
HOWEVER, if I try to create a stream (either thru the CLI or the REST API) I get the following error:
Could not write the statement 'create stream ... ' into the command topic: Transactional Id authorization failed.\nCaused by: Transactional Id authorization failed.
-
So it seems tha KSQL DB correctly authenticates on startup, but fails to do so when I issue commands thru the CLI or REST API. Is this expected? Is there any way around this?
-
I’ve also tried passing the auth props (authenticating as the ksql user) in the streamsProperties section of the POST request, but to no avail.
I’ve attached sanitized versions of my docker compose and the ksql command. Any help would be greatly appreciated.
docker-compose:
version: '3'
services:
schema_registry:
image: confluentinc/cp-schema-registry:6.0.0
container_name: schema_registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS:
SCHEMA_REGISTRY_HOST_NAME: ${DOCKER_HOST_IP}
SCHEMA_REGISTRY_LISTENERS: http://${DOCKER_HOST_IP}:8081
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "NONE"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SASL_PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: SCRAM-SHA-512
SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"schema_user\" password=\"\";"
network_mode: "host"
kafka_connect:
image: confluentinc/cp-kafka-connect:6.0.0
container_name: kafka-connect-avro-1
depends_on:
- schema_registry
ports:
- "8083:8083"
volumes:
- /usr/share/confluent-hub-components/:/usr/share/confluent-hub-components/
environment:
CONNECT_BOOTSTRAP_SERVERS:
CONNECT_REST_PORT: 8083
CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_SASL_MECHANISM: SCRAM-SHA-512
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"connect_user\" password=\"\";"
CONNECT_GROUP_ID: "connect-cluster-01"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-01-config"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-01-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-01-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://${DOCKER_HOST_IP}:8081"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://${DOCKER_HOST_IP}:8081"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "${DOCKER_HOST_IP}"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components/,/usr/share/java
CLASSPATH: /usr/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/lib/kafka-connect-hdfs-10.0.2.jar
network_mode: "host"
primary_ksqldb_server:
image: confluentinc/cp-ksqldb-server:6.0.0
hostname: primary_ksqldb_server
container_name: primary_ksqldb_server
depends_on:
- schema_registry
ports:
- "8088:8088"
volumes:
- /home/aziza_k/sch-conn-ksqldb/ksql/state/primary:/state:Z
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS:
KSQL_SECURITY_PROTOCOL: SASL_PLAINTEXT
KSQL_SASL_MECHANISM: SCRAM-SHA-512
KSQL_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ksql_user\" password=\"\";"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://${DOCKER_HOST_IP}:8081
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
network_mode: "host"
command
curl -X “POST” “http://localhost:8088/ksql”
-d ‘{
“ksql”: ‘"“create stream …) with (KAFKA_TOPIC = …, VALUE_FORMAT=‘JSON’);”"’,
“streamsProperties”: {
“ksql.streams.security.protocol”:“SASL_PLAINTEXT”,
“ksql.streams.sasl.mechanism”:“SCRAM-SHA-512”,
“ksql.streams.sasl.jaas.config”:“org.apache.kafka.common.security.scram.ScramLoginModule required username=“ksql_user” password=”";"
}
}’