I am new to Spark and Scala. I am trying to parse a Nested JSON format column from a Spark Table. Here is a sneak peek of the table (I only show the first row from the Spark Table, they all look identical for the rest of it)
doc.show(1)
doc_content object_id object_version
{"id":"lni001","pub_date".... 20220301 7098727
The structure of the "doc_content" column of each row looks like this (Some rows may have more information store inside the 'content' field):
{
"id":"lni001",
"pub_date":"20220301",
"doc_id":"7098727",
"unique_id":"64WP-UI-POLI",
"content":[
{
"c_id":"002",
"p_id":"P02",
"type":"org",
"source":"internet"
},
{
"c_id":"003",
"p_id":"P03",
"type":"org",
"source":"internet"
},
{
"c_id":"005",
"p_id":"K01",
"type":"people",
"source":"news"
}
]
}
I tried to use explode
on the "doc_content" column
doc.select(explode($"doc_content") as "doc_content")
.withColumn("id", col("doc_info.id"))
.withColumn("pub_date", col("doc_info.pub_date"))
.withColumn("doc_id", col("doc_info.doc_id"))
.withColumn("unique_id", col("doc_info.unique_id"))
.withColumn("content", col("doc_info.content"))
.withColumn("content", explode($"content"))
.withColumn("c_id", col("content.c_id"))
.withColumn("p_id", col("content.p_id"))
.withColumn("type", col("content.type"))
.withColumn("source", col("content.source"))
.drop(col("doc_content"))
.drop(col("content"))
.show()
but I got this error org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`doc_content`)' due to data type mismatch: input to function explode should be array or map type, not string;
. I am struggling on converting the column into Array or Map type (Probably new to Scala LOL).
After parsing the "doc_content" column, I want the table look like this.
id pub_date doc_id unique_id c_id p_id type source oject_id object_version
lni001 20220301 7098727 64WP-UI-POLI 002 P02 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 003 P03 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 005 K01 people news 20220301 7098727
I am wondering how can I do this and it will be great I can get some ideas or approaches on how to do this. Or maybe a better way than my approach since I have million of rows inside the Spark Table, if I can make it run faster.
Thanks!
CodePudding user response:
You can use from_json
to parses the JSON string into a MapType, then use explode
on a array column to create new rows, what means you should explode on doc_content.content
than doc_content
.
Specify the schema to use parsing the json string:
import org.apache.spark.sql.types._
val schema = new StructType()
.add("id", StringType)
.add("pub_date", StringType)
.add("doc_id", StringType)
.add("unique_id", StringType)
.add("content", ArrayType(MapType(StringType, StringType)))
then parse the json string and explode it
df.select(
$"object_id",
$"object_version",
from_json($"doc_content", schema).alias("doc_content")
).select(
$"object_id",
$"object_version",
col("doc_content.id").alias("id"),
col("doc_content.pub_date").alias("pub_date"),
col("doc_content.doc_id").alias("doc_id"),
col("doc_content.unique_id").alias("unique_id"),
explode(col("doc_content.content")).alias("content")
).select(
$"id",
$"pub_date",
$"doc_id",
$"unique_id",
col("content.c_id").alias("c_id"),
col("content.p_id").alias("p_id"),
col("content.type").alias("type"),
col("content.source").alias("source"),
$"object_id",
$"object_version"
)