I have a use case where I need to read a json file or json string using spark as Dataset[T] in scala. The json file has nested elements and some of the elements in the json are optional. I am able to read the json file and map those to case class if I ignore optional fields in the json as the schema matches with the case class.
If I remove that optional field from the case class then this code also works fine and shows me the expected output.
Is there any way by which I can make this optional field in the inner json or inner case class work and cast it directly to respective case class inside dataset[T].
Any ideas, guidance, suggestions that can make it work would be of great help.
CodePudding user response:
The problem is that spark uses struct types to map a class to a Row
, take this as an example:
case class MyRow(a: String, b: String, c: Option[String])
Can spark create a dataframe, which sometimes has column c
and sometimes not? like:
----- ----- -----
| a | b | c |
----- ----- -----
| a1 | b1 | c1 |
----- ----- -----
| a2 | b2 | <-- note the non-existence here :)
----- ----- -----
| a3 | b3 | c3 |
----- ----- -----
Well it cannot, and being nullable, means the key has to exist, but the value can be null:
... other key values
"optionalKey": null,
...
This is considered to be valid, and is convertible to your structs. I suggest you use a dedicated JSON library (as you know there are many of them out there), and use udf's or something to extract what you need from json.
CodePudding user response:
I tested the above code base with the following case class structres
case class Field3Array(
key1: String,
key2: List[String]
)
case class Input(
field1: String,
field2: String,
field3Array: List[Field3Array]
)
case class Output(
field1: String,
field2: String,
requiredKey: String,
field3Array: List[Field3Array]
)
case class Root(
Input: Input,
Output: Output
)
The Json string cannot be directly passed to the DataFrameReader as you have tried since the json
method expects a path.
I put the JSON string in a file and passed the file path to the DataFrameReader and the results were as follows
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.Dataset
case class Field3Array(
key1: String,
key2: List[String]
)
case class Input(
field1: String,
field2: String,
field3Array: List[Field3Array]
)
case class Output(
field1: String,
field2: String,
requiredKey: String,
field3Array: List[Field3Array]
)
case class Root(
Input: Input,
Output: Output
)
val pathToJson: String = "file:////path/to/json/file/on/local/filesystem"
val jsEncoder: Encoder[Root] = Encoders.product[Root]
val df: Dataset[Root] = spark.read.option("multiline","true").json(pathToJson).as[Root]
The results for show are as follows:
df.show(false)
-------------------------------------------- --------------------------------------------------------------
|Input |Output |
-------------------------------------------- --------------------------------------------------------------
|[Test1, Test2, [[Key123, [keyxyz, keyAbc]]]]|[Test2, Test3, [[Key123, [keyxyz, keyAbc]]], RequiredKeyValue]|
-------------------------------------------- --------------------------------------------------------------
df.select("Input.field1").show()
------
|field1|
------
| Test1|
------