KSQLDB on SASL enabled Kafka cluster

Hi all!

I’m having trouble launching KSQLDB on a SASL enabled Kafka cluster without RBACs configured.

  • I’m using a docker-compose file to launch the ksqldb server, authenticating with a user who has been granted all privileges on the cluster.

  • It seems that the ksqldb server starts correctly (at least there aren’t any error messages in the log). It creates the control topics, such as _confluent-ksql-default__command_topic and default_ksql_processing_log. Furthermore, if I read the command topic, I can see that a stream on top of the processing log has been created (this stream is also present if I do a LIST STREAMS; in the CLI).

  • HOWEVER, if I try to create a stream (either thru the CLI or the REST API) I get the following error:

    Could not write the statement 'create stream ... ' into the command topic: Transactional Id authorization failed.\nCaused by: Transactional Id authorization failed.

  • So it seems tha KSQL DB correctly authenticates on startup, but fails to do so when I issue commands thru the CLI or REST API. Is this expected? Is there any way around this?

  • I’ve also tried passing the auth props (authenticating as the ksql user) in the streamsProperties section of the POST request, but to no avail.

I’ve attached sanitized versions of my docker compose and the ksql command. Any help would be greatly appreciated.

docker-compose:

version: '3'

services:
  schema_registry:
    image: confluentinc/cp-schema-registry:6.0.0
    container_name: schema_registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 
      SCHEMA_REGISTRY_HOST_NAME: ${DOCKER_HOST_IP}
      SCHEMA_REGISTRY_LISTENERS: http://${DOCKER_HOST_IP}:8081
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "NONE"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SASL_PLAINTEXT
      SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: SCRAM-SHA-512
      SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"schema_user\" password=\"\";"
    network_mode: "host"
    
  kafka_connect:
    image: confluentinc/cp-kafka-connect:6.0.0
    container_name: kafka-connect-avro-1
    depends_on:
      - schema_registry
    ports:
      - "8083:8083"
    volumes:
      - /usr/share/confluent-hub-components/:/usr/share/confluent-hub-components/
    environment:
      CONNECT_BOOTSTRAP_SERVERS:
      CONNECT_REST_PORT: 8083
      CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
      CONNECT_SASL_MECHANISM: SCRAM-SHA-512
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"connect_user\" password=\"\";"
      CONNECT_GROUP_ID: "connect-cluster-01"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-01-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-01-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-01-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://${DOCKER_HOST_IP}:8081"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://${DOCKER_HOST_IP}:8081"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "${DOCKER_HOST_IP}"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components/,/usr/share/java
      CLASSPATH: /usr/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/lib/kafka-connect-hdfs-10.0.2.jar
    network_mode: "host"

  primary_ksqldb_server:
    image: confluentinc/cp-ksqldb-server:6.0.0
    hostname: primary_ksqldb_server
    container_name: primary_ksqldb_server
    depends_on:
      - schema_registry
    ports:
      - "8088:8088"
    volumes:
      - /home/aziza_k/sch-conn-ksqldb/ksql/state/primary:/state:Z
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: 
      KSQL_SECURITY_PROTOCOL: SASL_PLAINTEXT
      KSQL_SASL_MECHANISM: SCRAM-SHA-512
      KSQL_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ksql_user\" password=\"\";"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://${DOCKER_HOST_IP}:8081
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
    network_mode: "host"

command

curl -X “POST” “http://localhost:8088/ksql”
-d ‘{
“ksql”: ‘"“create stream …) with (KAFKA_TOPIC = …, VALUE_FORMAT=‘JSON’);”"’,
“streamsProperties”: {
“ksql.streams.security.protocol”:“SASL_PLAINTEXT”,
“ksql.streams.sasl.mechanism”:“SCRAM-SHA-512”,
“ksql.streams.sasl.jaas.config”:“org.apache.kafka.common.security.scram.ScramLoginModule required username=“ksql_user” password=”";"
}
}’