I have a pyspark application that consumes messages from a Kafka topic on Confluent. However during the deserialisation, it returns incorrect values
I have checked the schema, and the messages and they look fine. I am using spark version 3.3.0; this is how I deploy it
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-avro_2.12:3.3.0 stream_ads.py
The schema
{"type":"record","name":"ctr","namespace":"com.adclicks","fields":[{"name":"ad_id","type":"string"},{"name":"timestamp","type":"string"},{"name":"clicks","type":"int"},{"name":"views","type":"int"},{"name":"costs","type":"double"}]}
Here is the snippet
with open("mock_data_schema.json", "r") as f:
json_format_schema = f.read()
df = spark.readStream.format("kafka")\
.options(**kafka_options)\
.load()
df = df\
.select(from_avro(col("value"), schema_str).alias("data"))\
.select("data.*")
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="3 second") \
.start()\
.awaitTermination()
This is the output
+-----+---------+------+-----+----------------------+
|ad_id|timestamp|clicks|views|costs |
+-----+---------+------+-----+----------------------+
| | |-1 |84035|1.5673984559807316E-76|
| | |-1 |84035|1.567405013908699E-76 |
| | |-1 |84035|1.5673958049080624E-76|
| | |-1 |84035|1.5673997634004282E-76|
+-----+---------+------+-----+----------------------+
What could be wrong?