Kafka Spark Streaming integration

Hi folks! I’m trying to consume Kafka topics by Apache Spark for structured Streaming. There’re no problems with connection and String key-values. However, I can’t find a way to use other entities with Avro deserialization.
I want to emphasize that I’m talking about spark-streaming-kafka-0-10 where we build streams like this

val stream = KafkaUtils.createDirectStream[String, String](
      Subscribe[String, String](topics, kafkaParams),

I’ve pushed messages with String for key and MyCustomClass for value and now I have no idea how i consume this in spark streaming. How do i deserialize the value to smth meaningful? For example, I’d like to create a Dataset[MyCustomClass] to do some transformation.
On the internet there some examples that show how to work with spark kafka sql, but the syntax there is different. It looks smth like this

val rawTopicMessageDF = sql.readStream
  .option("kafka.bootstrap.servers", "")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")

Here we read some kind of dataframe already and functions like to_avro and from_avro are available to work with it. But what about spark-streaming-kafka way?
The spark-streaming-kafka API gives us foreachRDD method, where we get access to records of type ConsumerRecord. I’m very confused, could anyone explain me with example code how to use spark-streaming-kafka with value deserialization?

1 Like

First, with streaming approach, I don’t think you have access to ConsumerRecord at any point. But you could simply start with createDirectStream[Array[Byte], Array[Byte]] and do the deserialization on your own by instantiating KafkaAvroDeserializer

Structured Streaming should be preferred, however. You’d use SQL UDF functions to wrap standard Kafka (de)serializers

There are several answers here regarding the schema registry interaction - apache kafka - Integrating Spark Structured Streaming with the Confluent Schema Registry - Stack Overflow