Consuming with Airflow ConsumeFromTopicOperator

Has anyone had success using Apache Airflow’s ConsumeFromTopicOperator to consume messages from a Confluent Cloud cluster? We are trying to get a basic consumer running in our Airflow environment but we keep running into this error: ‘Subscribed topic not available: orders: Broker: Topic authorization failed’.

Our Kafka config looks like this:

Connection(
    conn_id="kafka_conn_id",
	conn_type="Kafka",
	extra= json.dumps({
		"bootstrap.servers":<BOOTSTRAP_ADDRESS>,
		"security.protocol":"SASL_SSL",
		"sasl.mechanisms":"PLAIN",
		"sasl.username":<API_KEY>,
		"sasl.password":<API_SECRET>,
		"group.id":"my-group",
		"auto.offset.reset":"earliest",
		"enable.auto.commit":False,
	})
)

We have double checked our topic ACLs, tried with multiple topics, and tried using an admin’s API key which should have unrestricted access. All of those attempts resulted in the same topic authorization error. Any other ideas would be greatly appreciated.