Spark - How to parse a Kafka topic which contains data in the form of nested json?

I am trying to read a kafka topic and stream it to my sink. To read the data, I wrote the following code.
topic data in json:

{
"HiveData": {
"Tablename": "HiveTablename1",
"Rowcount": "3213423",
"lastupdateddate": "2021-02-24 13:04:14"
},
"HbaseData": [
{
"Tablename": "HbaseTablename1",
"Rowcount": "23543",
"precision_column":"debit_rate"
"lastupdateddate": "2021-02-23 12:03:11"
}
],
"PostgresData": [
{
"Tablename": "PostgresTablename1",
"Rowcount": "23454345",
"lastupdateddate": "2021-02-23 12:03:11"
}
]
}

Below is the code I wrote for parsing the topic:

def streamData(): DataFrame = {
    val kafkaDF = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "server:port")
      .option("subscribe", "topic_name")
      .load()

    val df:DataFrame = kafkaDF.select(col("topic"), expr("cast (value as String) as data"))
    df.select(from_json(col("data"), schema).as("data"))
      .selectExpr("data.hive_schema.tablename")
      .writeStream
      .format("console")
      .option("truncate", value = false)
      .outputMode("append")
      .start()
      .awaitTermination()
}

Breaking down the code:
If I take just this part of line and run it, I can see data.

kafkaDF.select(col("topic"), expr("cast (value as String) as data"))

Output:

    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +------------+----------------------------+
    |topic       |          data              |
    +------------+----------------------------+
    |topic_name  |{"HiveData":.............|

To parse the nested json value present in the column data, I created a schema in Scala and used it in the writeStream statement to print the data to console as below:

val schema = new StructType()
		.add("HiveData", new StructType()
			.add("Tablename", StringType)
			.add("Rowcount", StringType)
			.add("lastupdateddate", StringType)
		)
		.add("HbaseData", new StructType()
			.add("Tablename", StringType)
			.add("Rowcount", StringType)
			.add("precision_column", StringType)
			.add("lastupdateddate", StringType)
		)
		.add("PostgresData", new StructType()
			.add("Tablename", StringType)
			.add("Rowcount", StringType)
			.add("lastupdateddate", StringType)
			.add("column", StringType)
		)
df.select(from_json(col("data"), schema).as("data"))
      .selectExpr("data.Hive_Data.tablename")
      .writeStream
      .format("console")
      .option("truncate", value = false)
      .outputMode("append")
      .start()
      .awaitTermination()

But after applying the nested schema on the column Data, I am seeing the output as null

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+
|tablename          |
+-------------------+
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
+-------------------+

In the nested json, there are three elements: HiveData, HBaseData, PostgresData which I am trying to put them into three seperate dataframes.

To test my logic, I tried to pull the data from only one of the element, in this case, HiveData and write it to my console to check if my logic is correct or not.

But it looks like the way I am creating the schema and applying it on the topic is not correct and hence it is writing null when I try to check the output.

Could anyone let me know what is the mistake I did here and how to correct it ?

Are you using Kafka Connect for this? Usually that’s how people stream data from Kafka topic to targets like Postgres. If this is a Spark question then perhaps #clients might be a better place to move this topic to. If you can clarify this I can do that for you.

@Rmoff, I have updated the question with better explanation. Yes this is a question of Kafka + Spark Streaming. Could you please move this question to to corresponding section (#clients ).

Thanks for the help.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.