I have a scala List object with a recursive definition of all operations I have to do with columns of a spark dataframe. For example, the operations
(C1 - C2) ( (C3 - C4)- (C5 -C6) )
are defined by the next scala List:
List("addition", List("substraction",List("C1","C2")),
List("substraction",
List("substraction",List("C3","C4")),
List("substraction"), List("C5","C6"))
)
where "C1",...,"C5"
are the names of the spark dataframes columns.
I would like to define a recursive scala function that gives me the final column result.
Does anyone know a way to do it?
CodePudding user response:
The way you define the operation is quite strange. You encapsulate column name operands in a list, but not complex operands. Therefore your lists can either have 2 or three elements. How would you define something like (A (B-C))
? I would start by fixing that and write your operation either like this (3 elements per list):
val list = List("addition",
List("substraction","C1","C2"),
List("substraction",
List("substraction","C3","C4"),
List("substraction", "C5","C6")
)
)
or like this (2 elements per list):
val list = List("addition", List(
List("substraction", List("C1","C2")),
List("substraction", List(
List("substraction", List("C3","C4")),
List("substraction", List("C5","C6"))
)))
)
The second version being much more verbose, let's pick the first one and write the recursive function:
def operation_to_col(operation : Any) : Column = {
operation match {
case x : String => col(x)
case List("addition", s1 : Any, s2 : Any) =>
operation_to_col(s1) operation_to_col(s2)
case List("substraction", s1 : Any, s2 : Any) =>
operation_to_col(s1) operation_to_col(s2)
}
}
CodePudding user response:
First, I am going to change the definition of the operations. For example, the operations
(C1 - C2) ( (C3 - C4)- (C5 -C6) )
are defined by the next scala List:
val list = List("addition",
List("substraction","C1","C2"),
List("substraction",
List("substraction","C3","C4"),
List("substraction", "C5","C6")
) )
I am going to create a dataframe for an example:
val data = Seq((1000, 1, 2,3,4,5), (2000,1,2,3,4,5), (3000,1,2,3,4,5))
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF("C1","C2","C3","C4","C5","C6")
The List of permitted operations is:
val operations=List("addition","subtraction","multiplication","division")
I created the next Map object to associate the operations and their symbols:
val oprSimbols:Map[String,String] = Map("addition"->" ", "substraction"-> "-", "multiplication"->"*","division"->"/")
Finally, I define the function that solves the problem:
def operation_to_col(df: DataFrame,oprSimbols: Map[String,String],
operations:List[String], list : Any) : DataFrame = {
list match {
case x if operations.contains(x.toString) => df.select(col(x.toString))
case List(oprName:String,x:String, y:String) =>{
val sym = oprSimbols(oprName)
val exprOpr = List(x,sym,y).mkString(" ")
df.selectExpr(exprOpr)}
case List(oprName:String, s1 : Any, s2 : Any) =>{
val df1 = operation_to_col(df,oprSimbols,operations,s1)
val df2 = operation_to_col(df,oprSimbols,operations,s2)
val sym = oprSimbols(oprName)
val exprOpr = List(df1.columns(0),sym,df2.columns(0)).mkString(" ")
df.selectExpr(exprOpr)
}
} }
We can check it:
operation_to_col(df,oprSimbols, operations, list )