Hi there,
I’m trying to integrate my Apache Beam pipeline with Confluent Kafka yet I’m unable to have messages processed.
My pipeline is using Beam’s Python SDK and I’m using the apache_beam.io.kafka package.
I built a simple pipeline to test the integration and I’m using the DatagenSourceConnector to generate random test data.
My code:
with beam.Pipeline(options=self.beam_options) as pipeline:
kafka_elements = (
pipeline
| "Consume from Kafka"
>> ReadFromKafka(
consumer_config={
"bootstrap.servers": "SERVER:9092",
"security.protocol": "SASL_SSL",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='USER_NAME' password='PASSWORD",
"sasl.mechanism": "PLAIN",
# Required for correctness in Apache Kafka clients prior to 2.6
"client.dns.lookup": "use_all_dns_ips",
# Best practice for higher availability in Apache Kafka clients prior to 3.0
"session.timeout.ms": "45000",
},
topics=["topic_0"],
)
| "Print elements" >> beam.Map(print)
When I run the pipeline I can see that the consumers are updating their offsets yet nothing is printed. I even tried using a different method instead of ‘print’ where I just throw an exception but that code never triggers.
Any insight on what I’m missing here would be highly appreciated