I have below structure
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: struct (nullable = true)
| | | |-- **{ program id }**: struct (nullable = true)
| | | | |-- Date: timestamp (nullable = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Some_Flags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)
“groups” : [
{
… some other fields …
“programs” : {
“123c12b123456c1d76a4f265f10f20a0” : {
“name” : “test_program_1”,
“some_flags” : {
“abc” : true,
“def” : true,
“ghi” : false,
“xyz” : true
},
“date” : ISODate(“2019–11–16T03:29:00.000 0000”)
}
}
]
val data = spark.read.json("path").map(customParser) How do I use custom parser to map to case class?
data is coming from mongo db. Need to distributed parse so that I can iterate over each line.
CodePudding user response:
As the json document has a variable key (the program id
is not a constant key but varies for each entry) Spark cannot infer the schema. One option is to process the document manually:
The case classes:
case class SomeFlags(abc: Boolean, def1: Boolean, ghi: Boolean, xyz: Boolean)
case class Program(var programId: String, date: String, name: String, someFlags: SomeFlags)
case class Group(programs: Array[Program])
case class Groups(groups: Array[Group])
The companion objects for extracting the data fields from the json string:
object Groups {
def unapply(values: Map[String, Object]) = try {
val groups = values("groups").asInstanceOf[List[Map[String, Object]]]
val grps = new ListBuffer[Group]()
for (group <- groups) {
val Group(grp) = group
grps = grp
}
Some(Groups(Array(grps: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Group {
def unapply(values: Map[String, Object]) = try {
val programs = values("programs").asInstanceOf[Map[String, Object]]
val prgs = new ListBuffer[Program]()
for ((k, v) <- programs) {
val Program(prg) = v.asInstanceOf[Map[String, Object]];
prg.programId = k;
prgs = prg;
}
Some(Group(Array(prgs: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Program {
def unapply(values: Map[String, Object]) = try {
val SomeFlags(flags) = values("some_flags").asInstanceOf[Map[String, Object]]
Some(Program("pid", values("date").asInstanceOf[String], values("name").asInstanceOf[String], flags))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object SomeFlags {
def unapply(values: Map[String, Object]) = try {
Some(SomeFlags(values("abc").asInstanceOf[Boolean], values("def").asInstanceOf[Boolean], values("ghi").asInstanceOf[Boolean], values("xyz").asInstanceOf[Boolean]))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
The critical part here is inside of Group.unapply
where the prg.programId
is manually set to the key of the map containing all the programs.
Finally the Spark code. DataframeReader.textFile is used to read the file (each line should contain a whole Json document). The result is a Dataset[String]
and any other datasouce that produces a dataframe containg one complete Json document per line will work too.
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
val ds: Dataset[String] = spark.read.textFile(<path to file>)
val ds2: Dataset[Groups] = ds.map(s => {
val mapper = new ObjectMapper() with ScalaObjectMapper //https://stackoverflow.com/a/20034844/2129801
mapper.registerModule(DefaultScalaModule)
val obj = mapper.readValue[Map[String, Object]](s)
val Groups(groups) = obj
groups
})
ds2
now has the schema:
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- programId: string (nullable = true)
| | | | |-- date: string (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- someFlags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = false)
| | | | | |-- def1: boolean (nullable = false)
| | | | | |-- ghi: boolean (nullable = false)
| | | | | |-- xyz: boolean (nullable = false)
Things to improve:
- better error handling within the
unapply
methods - replace the
map
function withmapPartitions
to improve the performance