Hi Team,
I need help on connecting to confluent kafka (mechanism: PLAIN, protocol : SASL) using pyspark 2.4.4 on spark cluster version 2.4.7.
Existing code:
kafkaParams = {
“bootstrap.servers”: “pkc-ldvmy.centralus.azure.confluent.cloud:9092”,
“spark.kafka.security.protocol”: “SASL_SSL”,
“spark.kafka.sasl.mechanism”: “PLAIN”,
“spark.kafka.sasl.jaas.config”: “org.apache.kafka.common.security.plain.PlainLoginModule required username=‘username’ password='”,
“spark.kafka.ssl.truststore.location”: “”,
“spark.kafka.ssl.truststore.password”: “cert password”,
“spark.kafka.ssl.truststore.type”: “JKS”
}
stream = KafkaUtils.createDirectStream(
ssc,
[],
kafkaParams,
fromOffsets=None # or specify starting offsets
)
When I am submitting above snippet with spark-submit
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4,org.apache.kafka:kafka-clients:2.4.1 example.py
I am getting below error:
24/08/21 17:09:14 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, elastic-1724238278336-exec-5, 43871, None)
24/08/21 17:09:14 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, elastic-1724238278336-exec-5, 43871, None)
24/08/21 17:09:14 INFO SharedState: Setting hive.metastore.warehouse.dir (‘null’) to the value of spark.sql.warehouse.dir (‘file:/code/spark-warehouse’).
24/08/21 17:09:14 INFO SharedState: Warehouse path is ‘file:/code/spark-warehouse’.
24/08/21 17:09:15 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
24/08/21 17:09:15 INFO VerifiableProperties: Verifying properties
24/08/21 17:09:15 INFO VerifiableProperties: Property group.id is overridden to
24/08/21 17:09:15 WARN VerifiableProperties: Property spark.kafka.sasl.jaas.config is not valid
24/08/21 17:09:15 WARN VerifiableProperties: Property spark.kafka.sasl.mechanism is not valid
24/08/21 17:09:15 WARN VerifiableProperties: Property spark.kafka.security.protocol is not valid
24/08/21 17:09:15 WARN VerifiableProperties: Property spark.kafka.ssl.truststore.location is not valid
24/08/21 17:09:15 WARN VerifiableProperties: Property spark.kafka.ssl.truststore.password is not valid
24/08/21 17:09:15 WARN VerifiableProperties: Property spark.kafka.ssl.truststore.type is not valid
24/08/21 17:09:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to
24/08/21 17:09:20 INFO SimpleConsumer: Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
Traceback (most recent call last):
File “/code/example.py”, line 28, in
fromOffsets=None # or specify starting offsets
File “/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py”, line 146, in createDirectStream
File “/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, line 1257, in call
File “/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, line 63, in deco
File “/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”, line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o36.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)
at scala.util.Either.fold(Either.scala:98)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:384)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
Any help will be appreciated.strong text