Home > database >  Spark parse nested json with variable json keys
Spark parse nested json with variable json keys

Time:04-03

I have below structure

root

 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- programs: struct (nullable = true)
 |    |    |    |-- **{ program id }**: struct (nullable = true)
 |    |    |    |    |-- Date: timestamp (nullable = true)
 |    |    |    |    |-- Name: string (nullable = true)
 |    |    |    |    |-- Some_Flags: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = true)
 |    |    |    |    |    |-- def: boolean (nullable = true)
 |    |    |    |    |    |-- ghi: boolean (nullable = true)
 |    |    |    |    |    |-- xyz: boolean (nullable = true)




“groups” : [
 {
  … some other fields …
  “programs” : {
     “123c12b123456c1d76a4f265f10f20a0” : {
        “name” : “test_program_1”, 
        “some_flags” : {
           “abc” : true, 
           “def” : true, 
           “ghi” : false, 
           “xyz” : true
        }, 
        “date” : ISODate(“2019–11–16T03:29:00.000 0000”)
     }
 }
]

val data = spark.read.json("path").map(customParser) How do I use custom parser to map to case class?

data is coming from mongo db. Need to distributed parse so that I can iterate over each line.

CodePudding user response:

As the json document has a variable key (the program id is not a constant key but varies for each entry) Spark cannot infer the schema. One option is to process the document manually:

The case classes:

case class SomeFlags(abc: Boolean, def1: Boolean, ghi: Boolean, xyz: Boolean)
case class Program(var programId: String, date: String, name: String, someFlags: SomeFlags)
case class Group(programs: Array[Program])
case class Groups(groups: Array[Group])

The companion objects for extracting the data fields from the json string:


object Groups {
  def unapply(values: Map[String, Object]) = try {
    val groups = values("groups").asInstanceOf[List[Map[String, Object]]]
    val grps = new ListBuffer[Group]()
    for (group <- groups) {
      val Group(grp) = group
      grps  = grp
    }
    Some(Groups(Array(grps: _*)))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}
  
object Group {
  def unapply(values: Map[String, Object]) = try {
    val programs = values("programs").asInstanceOf[Map[String, Object]]
    val prgs = new ListBuffer[Program]()
    for ((k, v) <- programs) {
      val Program(prg) = v.asInstanceOf[Map[String, Object]];
      prg.programId = k;
      prgs  = prg;
    }
    Some(Group(Array(prgs: _*)))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

object Program {
  def unapply(values: Map[String, Object]) = try {
    val SomeFlags(flags) = values("some_flags").asInstanceOf[Map[String, Object]]
    Some(Program("pid", values("date").asInstanceOf[String], values("name").asInstanceOf[String], flags))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

object SomeFlags {
  def unapply(values: Map[String, Object]) = try {
    Some(SomeFlags(values("abc").asInstanceOf[Boolean], values("def").asInstanceOf[Boolean], values("ghi").asInstanceOf[Boolean], values("xyz").asInstanceOf[Boolean]))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

The critical part here is inside of Group.unapply where the prg.programId is manually set to the key of the map containing all the programs.

Finally the Spark code. DataframeReader.textFile is used to read the file (each line should contain a whole Json document). The result is a Dataset[String] and any other datasouce that produces a dataframe containg one complete Json document per line will work too.

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}

val ds: Dataset[String] = spark.read.textFile(<path to file>)

val ds2: Dataset[Groups] = ds.map(s => {
  val mapper = new ObjectMapper() with ScalaObjectMapper //https://stackoverflow.com/a/20034844/2129801
  mapper.registerModule(DefaultScalaModule)
  val obj = mapper.readValue[Map[String, Object]](s)
  val Groups(groups) = obj
  groups
})

ds2 now has the schema:

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- programs: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- programId: string (nullable = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- someFlags: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = false)
 |    |    |    |    |    |-- def1: boolean (nullable = false)
 |    |    |    |    |    |-- ghi: boolean (nullable = false)
 |    |    |    |    |    |-- xyz: boolean (nullable = false)

Things to improve:

  • better error handling within the unapply methods
  • replace the map function with mapPartitions to improve the performance
  • Related