Hi folks! I’m trying to consume Kafka topics by Apache Spark for structured Streaming. There’re no problems with connection and String key-values. However, I can’t find a way to use other entities with Avro deserialization.
I want to emphasize that I’m talking about
spark-streaming-kafka-0-10 where we build streams like this
val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams), )
I’ve pushed messages with String for key and MyCustomClass for value and now I have no idea how i consume this in spark streaming. How do i deserialize the value to smth meaningful? For example, I’d like to create a Dataset[MyCustomClass] to do some transformation.
On the internet there some examples that show how to work with spark kafka sql, but the syntax there is different. It looks smth like this
val rawTopicMessageDF = sql.readStream .format("kafka") .option("kafka.bootstrap.servers", "127.0.0.1:9092") .option("subscribe", topicName) .option("startingOffsets", "earliest") .load()
Here we read some kind of dataframe already and functions like to_avro and from_avro are available to work with it. But what about spark-streaming-kafka way?
The spark-streaming-kafka API gives us foreachRDD method, where we get access to records of type ConsumerRecord. I’m very confused, could anyone explain me with example code how to use spark-streaming-kafka with value deserialization?