I have the below code to cache various partitions and save them in a map then to union them all
and i am getting the below error unionByName is not a member of null
Var cache_map = Map[String,Dataframe]()
for (partition <- partitionlist){
var df_test = spark.read.format("delta").load("abfs://[email protected]/dirname")
.where((col("dt").like(partition "%"))
cache_map(partition) = df_test.cache()
}
val cache_keys = cache_map.keys
var df_union=null
for (key <- cache_keys){
if(df_union==null){
df_union=cache_map.get(key)
}
else{
df_union=df_union.unionByName(cache_map.get(key)
}
}
When I do below
cache_map.get("20221120").unionByName(cache_map.get("20221119"))
I get the below error
Error: value unionByName is not a member of Option[org.apache.spark.sql.DataFrame]
Can anyone help me wity what's going wrong? I don't have as much experience with spark using scala as I have with pyspark. Any help is greatly appreciated.
CodePudding user response:
As stated in Exception you are trying to call function unionByName on Option[DataFrame] and its failing because there is no such funtion on Option type. You need to either map or get you underlying dataframe from the Option to be able use df functions
You can try for example something like this:
cache_map.get("20221120").map{_.unionByName(cache_map.get("20221119").getOrElse(spark.emptyDataFrame))}
CodePudding user response:
The error you are seeing is masking the real issue that you are going about the problem in the wrong way.
You are, as far as I can tell, reading in a (partitioned?) directory and then wanting only a subset of the data in a DataFrame
in the variable df_union
.
You are doing this by making multiple passes across the data, and each time you find a set of records you like you save them into a map, before finally combining all of your result sets.
This can be achieved with something like the following, where df
stands in for your data:
val df = Seq(
("a1", 1),
("b1", 2),
("c1", 3),
("d1", 4),
("e1", 5),
).toDF("dt", "x")
// List of partition names you want to include in your final data
val partitionList: List[String] = List("a", "c", "e")
val filterCol: Column = col("dt")
// build a single filter expression
val filterExpr: Column = partitionList.map(pt => filterCol.like(s"$pt%")).reduce(_ or _)
// execute filtering
val filteredDf = df.filter(filterExpr)