Self-Managed S3 Sink Connector by docker doesn't work properly

I tried creating Self-Managed S3 Sink Connector by docker in order to subscribe Confluent Cloud Audit Log. However, I was rejected by “TopicAurhoirzationException” when I make a Worker Configuration in Kafka Connect. Do you know why it happens? I guess unsurely the reason of the error in Kafka Connect is in failure of docker, not in Confluent Cloud.
If anyone know the sample of Self-Managed S3 Sink Connector by docker, I’d happy to know that.

Error

audit    | [timestamp] ERROR [Worker clientId=connect-1, groupId=confluent-audit-log-events] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
audit    | org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [confluent-audit-log-events-offsets]

Docs about Self-Managed S3 Sink Connector

Amazon S3 Sink Connector for Confluent Platform | Confluent Documentation

Hands On About Self-Managed Connector

Run Self-Managed Kafka Connect in Docker Containers

Sample docker-compose.yml is here.

version: "2"
services:
  audit:
    image: confluentinc/cp-server-connect:7.1.1
    tty: true
    restart: always
    hostname: audit
    container_name: audit
    ports:
      - "8083:8083"
    volumes:
      - ./data:/data
    environment:
      CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
      CONNECT_GROUP_ID: "confluent-audit-log-events"

      CONNECT_CONFIG_STORAGE_TOPIC: "confluent-audit-log-events-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "confluent-audit-log-events-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "confluent-audit-log-events-status"

      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"

      # Confluent Schema Registry for Kafka Connect
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO

      CONNECT_REST_ADVERTISED_HOST_NAME: "audit"
      CONNECT_LISTENERS: http://audit:8083
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"

      # Confluent Cloud config
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"

      # Connect worker
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_SASL_MECHANISM: PLAIN

      # Connect producer
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN

      # Connect consumer
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN

    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.3.1
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

        echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
        while : ; do
            curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
            echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
            if [ $$curl_status -eq 200 ] ; then
            break
            fi
            sleep 5
        done

        echo -e "\n--\n+> Creating Kafka Connect source connectors"
        curl -i -X PUT -H "Content-Type: application/json" \
          http://localhost:8083/connectors/s3-connector/config \
          --data '
            {
              "connector.class": "io.confluent.connect.s3.S3SinkConnector",
              "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
              "flush.size": "1000",
              "s3.bucket.name": "confluent-audit-log",
              "storage.class": "io.confluent.connect.s3.storage.S3Storage",
              "tasks.max": "2",
              "topics": "confluent-audit-log-events",
              "name": "s3-sink",
            }
          '

  # Other systems
  s3:
    image: adobe/s3mock
    tty: true
    environment:
      - initialBuckets=confluent-audit-log

hey @jack

does your role grant you permission to manage your org?
would be necessary to enable audit log

see Access and consume Confluent Cloud audit logs | Confluent Documentation

best,
michael

Thank you for your reply!
Yes, I have permission to manage audit log.
And, I attached api-key against audit log’s cluster as following, and added this info inside docker-compose.yml.

$ confluent  audit-log describe
./confluen+-----------------+----------------------------+
| Cluster         | lkc-oq53go                 |
| Environment     | env-xmo0jg                 |
| Service Account | sa-3wq0v2                  |
| Topic Name      | confluent-audit-log-events |
+-----------------+----------------------------+
$ confluent environment use env-xmo0jg
Now using "env-xmo0jg" as the default (active) environment.
$ confluent kafka cluster use lkc-oq53go
Set Kafka cluster "lkc-oq53go" as the active cluster for environment "env-xmo0jg".
$ confluent api-key list --resource lkc-oq53go
  Current |       Key        | Description |   Owner   |        Owner Email         | Resource Type |  Resource  |       Created
----------+------------------+-------------+-----------+----------------------------+---------------+------------+-----------------------
  *       | PMIKMEK72FQRAOMV |             | sa-3wq0v2 | <auditlog service account> | kafka         | lkc-oq53go | TIME

I think the problem is that audit log’s cluster managed by confluent cloud doesn’t give us enough permission for us to make a topic such as “config”, “storage”, and “offset”.

I found that I need to split BOOTSTRAP_SERVERS about producer and consumer from that of worker as a purpose of subscribing audit data from confluent audit cluster.

      # Connect producer
      CONNECT_PRODUCER_BOOTSTRAP_SERVERS: $PRODUCE_CONSUME_BOOTSTRAP_SERVERS
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: $PRODUCE_CONSUME_SASL_JAAS_CONFIG
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN

      # Connect consumer
      CONNECT_CONSUMER_BOOTSTRAP_SERVERS: $PRODUCE_CONSUME_BOOTSTRAP_SERVERS
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: $PRODUCE_CONSUME_SASL_JAAS_CONFIG
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.