I tried creating Self-Managed S3 Sink Connector by docker in order to subscribe Confluent Cloud Audit Log. However, I was rejected by “TopicAurhoirzationException” when I make a Worker Configuration in Kafka Connect. Do you know why it happens? I guess unsurely the reason of the error in Kafka Connect is in failure of docker, not in Confluent Cloud.
If anyone know the sample of Self-Managed S3 Sink Connector by docker, I’d happy to know that.
Error
audit | [timestamp] ERROR [Worker clientId=connect-1, groupId=confluent-audit-log-events] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
audit | org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [confluent-audit-log-events-offsets]
Docs about Self-Managed S3 Sink Connector
Amazon S3 Sink Connector for Confluent Platform | Confluent Documentation
Hands On About Self-Managed Connector
Sample docker-compose.yml
is here.
version: "2"
services:
audit:
image: confluentinc/cp-server-connect:7.1.1
tty: true
restart: always
hostname: audit
container_name: audit
ports:
- "8083:8083"
volumes:
- ./data:/data
environment:
CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
CONNECT_GROUP_ID: "confluent-audit-log-events"
CONNECT_CONFIG_STORAGE_TOPIC: "confluent-audit-log-events-config"
CONNECT_OFFSET_STORAGE_TOPIC: "confluent-audit-log-events-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "confluent-audit-log-events-status"
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
# Confluent Schema Registry for Kafka Connect
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
CONNECT_REST_ADVERTISED_HOST_NAME: "audit"
CONNECT_LISTENERS: http://audit:8083
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
# Connect worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_SASL_MECHANISM: PLAIN
# Connect producer
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
# Connect consumer
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.3.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Creating Kafka Connect source connectors"
curl -i -X PUT -H "Content-Type: application/json" \
http://localhost:8083/connectors/s3-connector/config \
--data '
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "1000",
"s3.bucket.name": "confluent-audit-log",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"tasks.max": "2",
"topics": "confluent-audit-log-events",
"name": "s3-sink",
}
'
# Other systems
s3:
image: adobe/s3mock
tty: true
environment:
- initialBuckets=confluent-audit-log