Hi folks! I’m trying to consume Kafka topics by Apache Spark for structured Streaming. There’re no problems with connection and String key-values. However, I can’t find a way to use other entities with Avro deserialization.
I want to emphasize that I’m talking about spark-streaming-kafka-0-10
where we build streams like this
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams),
)
I’ve pushed messages with String for key and MyCustomClass for value and now I have no idea how i consume this in spark streaming. How do i deserialize the value to smth meaningful? For example, I’d like to create a Dataset[MyCustomClass] to do some transformation.
On the internet there some examples that show how to work with spark kafka sql, but the syntax there is different. It looks smth like this
val rawTopicMessageDF = sql.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.load()
Here we read some kind of dataframe already and functions like to_avro and from_avro are available to work with it. But what about spark-streaming-kafka way?
The spark-streaming-kafka API gives us foreachRDD method, where we get access to records of type ConsumerRecord. I’m very confused, could anyone explain me with example code how to use spark-streaming-kafka with value deserialization?