Home > Software engineering >  Using Spark converting nested json with optional fields to Scala case class not working
Using Spark converting nested json with optional fields to Scala case class not working

Time:05-28

I have a use case where I need to read a json file or json string using spark as Dataset[T] in scala. The json file has nested elements and some of the elements in the json are optional. I am able to read the json file and map those to case class if I ignore optional fields in the json as the schema matches with the case class.

According to this enter image description here

If I remove that optional field from the case class then this code also works fine and shows me the expected output.

Is there any way by which I can make this optional field in the inner json or inner case class work and cast it directly to respective case class inside dataset[T].

Any ideas, guidance, suggestions that can make it work would be of great help.

CodePudding user response:

The problem is that spark uses struct types to map a class to a Row, take this as an example:

case class MyRow(a: String, b: String, c: Option[String])

Can spark create a dataframe, which sometimes has column c and sometimes not? like:

 ----- ----- ----- 
|  a  |  b  |  c  |
 ----- ----- ----- 
| a1  | b1  |  c1 |
 ----- ----- ----- 
| a2  | b2  |        <-- note the non-existence here :)
 ----- ----- ----- 
| a3  | b3  | c3  |
 ----- ----- ----- 

Well it cannot, and being nullable, means the key has to exist, but the value can be null:

    ... other key values
    "optionalKey": null,
    ...

This is considered to be valid, and is convertible to your structs. I suggest you use a dedicated JSON library (as you know there are many of them out there), and use udf's or something to extract what you need from json.

CodePudding user response:

I tested the above code base with the following case class structres

case class Field3Array(
  key1: String,
  key2: List[String]
)
case class Input(
  field1: String,
  field2: String,
  field3Array: List[Field3Array]
)
case class Output(
  field1: String,
  field2: String,
  requiredKey: String,
  field3Array: List[Field3Array]
)
case class Root(
  Input: Input,
  Output: Output
)

The Json string cannot be directly passed to the DataFrameReader as you have tried since the json method expects a path. I put the JSON string in a file and passed the file path to the DataFrameReader and the results were as follows

import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.Dataset

case class Field3Array(
  key1: String,
  key2: List[String]
)
case class Input(
  field1: String,
  field2: String,
  field3Array: List[Field3Array]
)
case class Output(
  field1: String,
  field2: String,
  requiredKey: String,
  field3Array: List[Field3Array]
)
case class Root(
  Input: Input,
  Output: Output
)


val pathToJson: String = "file:////path/to/json/file/on/local/filesystem"

val jsEncoder: Encoder[Root] = Encoders.product[Root]

val df: Dataset[Root] = spark.read.option("multiline","true").json(pathToJson).as[Root]

The results for show are as follows:

df.show(false)

 -------------------------------------------- -------------------------------------------------------------- 
|Input                                       |Output                                                        |
 -------------------------------------------- -------------------------------------------------------------- 
|[Test1, Test2, [[Key123, [keyxyz, keyAbc]]]]|[Test2, Test3, [[Key123, [keyxyz, keyAbc]]], RequiredKeyValue]|
 -------------------------------------------- -------------------------------------------------------------- 

df.select("Input.field1").show()

 ------ 
|field1|
 ------ 
| Test1|
 ------ 
  • Related