I am trying to read a kafka topic and stream it to my sink. To read the data, I wrote the following code.
topic data in json:
{
"HiveData": {
"Tablename": "HiveTablename1",
"Rowcount": "3213423",
"lastupdateddate": "2021-02-24 13:04:14"
},
"HbaseData": [
{
"Tablename": "HbaseTablename1",
"Rowcount": "23543",
"precision_column":"debit_rate"
"lastupdateddate": "2021-02-23 12:03:11"
}
],
"PostgresData": [
{
"Tablename": "PostgresTablename1",
"Rowcount": "23454345",
"lastupdateddate": "2021-02-23 12:03:11"
}
]
}
Below is the code I wrote for parsing the topic:
def streamData(): DataFrame = {
val kafkaDF = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "server:port")
.option("subscribe", "topic_name")
.load()
val df:DataFrame = kafkaDF.select(col("topic"), expr("cast (value as String) as data"))
df.select(from_json(col("data"), schema).as("data"))
.selectExpr("data.hive_schema.tablename")
.writeStream
.format("console")
.option("truncate", value = false)
.outputMode("append")
.start()
.awaitTermination()
}
Breaking down the code:
If I take just this part of line and run it, I can see data.
kafkaDF.select(col("topic"), expr("cast (value as String) as data"))
Output:
-------------------------------------------
Batch: 1
-------------------------------------------
+------------+----------------------------+
|topic | data |
+------------+----------------------------+
|topic_name |{"HiveData":.............|
To parse the nested json value present in the column data
, I created a schema in Scala and used it in the writeStream
statement to print the data to console as below:
val schema = new StructType()
.add("HiveData", new StructType()
.add("Tablename", StringType)
.add("Rowcount", StringType)
.add("lastupdateddate", StringType)
)
.add("HbaseData", new StructType()
.add("Tablename", StringType)
.add("Rowcount", StringType)
.add("precision_column", StringType)
.add("lastupdateddate", StringType)
)
.add("PostgresData", new StructType()
.add("Tablename", StringType)
.add("Rowcount", StringType)
.add("lastupdateddate", StringType)
.add("column", StringType)
)
df.select(from_json(col("data"), schema).as("data"))
.selectExpr("data.Hive_Data.tablename")
.writeStream
.format("console")
.option("truncate", value = false)
.outputMode("append")
.start()
.awaitTermination()
But after applying the nested schema on the column Data, I am seeing the output as null
-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+
|tablename |
+-------------------+
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
|null |
+-------------------+
In the nested json, there are three elements: HiveData, HBaseData, PostgresData which I am trying to put them into three seperate dataframes.
To test my logic, I tried to pull the data from only one of the element, in this case, HiveData
and write it to my console to check if my logic is correct or not.
But it looks like the way I am creating the schema and applying it on the topic is not correct and hence it is writing null
when I try to check the output.
Could anyone let me know what is the mistake I did here and how to correct it ?