In a coursera course about spark scala I had a lesson about using fillna and replace functions.
I tried to reproduce it to check how does it work in real life but I have a problem with creating df with values that are meant to be replaced.
I tried to do it with use of json input file and with use of sequence of tuples. In both cases I received exceptions.
Could you please give advice what do I have to do to create DataFrame which contains null / NaN / None (maybe all of them, that would be the best scenario for the learning purpose).
object HowToCreateDfWithNullsOrNaNs
{
def main(args: Array[String]): Unit =
{
fromFile()
}
def fromFile(): Unit =
{
// input_file.json: { "name": "Tom", "surname": null, "age": 10}
val rddFromJson: RDD[String] = spark.sparkContext.textFile("src/main/resources/input_file.json")
import spark.implicits._
/*
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (1): value
New column names (3): name, surname, age
*/
rddFromJson.toDF("name", "surname", "age")
}
def fromSeq() =
{
val tupleSeq: Seq[(String, Any, Int)] = Seq(("Tom", null , 10))
val rdd = spark.sparkContext.parallelize(tupleSeq)
/*
Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
*/
import spark.implicits._
rdd.toDF("name", "surname", "age")
}
}
CodePudding user response:
Since you want to create DataFrames, I suggest not to struggle with RDD-->DF transformations, and create, well, DataFrames.
For example, modifying your two examples:
// input_file.json: { "name": "Tom", "surname": null, "age": 10}
val df = spark.read.json("src/main/resources/input_file.json")
--- ---- -------
|age|name|surname|
--- ---- -------
| 10| Tom| null|
--- ---- -------
val columns = Seq("name","surname","age")
val tupleSeq: Seq[(String, String, Int)] = Seq(("Tom", null , 10))
spark.createDataFrame(tupleSeq).toDF(columns:_*)
---- ------- ---
|name|surname|age|
---- ------- ---
| Tom| null| 10|
---- ------- ---
Note that in the 2nd example the type shouldn't be Any
, for the reasons given in this answer:
All fields / columns in a Dataset have to be of known, homogeneous type for which there is an implicit Encoder in the scope. There is simply no place for
Any
there