Just listing out 2 solutions for which I had tried to achieve a use case to apply a spark udf to some of the columns, but I am not sure why both of my functions are behaving totally differently even when I am trying to achieve the same thing. Can someone explain the internal working, as of what exactly is happening in both of these cases?
Function 1:
def transformColumns(df: DataFrame, transformationType: String, sanitizationList: List[Sanitization]): DataFrame = {
try {
sanitizationList.foldLeft(df) {
(outerAccumulator: DataFrame, sanitization: Sanitization) =>
val aes: TAlgorithm = new AES256(key, iv)
@transient lazy val udfFunction = udf(aes.decrypt(_)
sanitization.column.foldLeft(outerAccumulator: DataFrame) {
(innerAccumulator: DataFrame, elem: String) =>
innerAccumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
}
}
}
Function 2 :
def transformColumns(df: DataFrame, columns: Map[Seq[String], TAlgorithm]): DataFrame = {
try {
columns.foldLeft(df) {
(accumulator: DataFrame, sanitization: (Seq[String], TAlgorithm)) =>
import org.apache.spark.sql.functions.udf
val aes: TAlgorithm = new AES256(key, iv)
@transient lazy val udfFunction = udf(aes.decrypt(_))
sanitization._1.foreach{
elem => accumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
}
accumulator
}
}
In the second case none of columns are being transformed, not sure why.
CodePudding user response:
In your 1st example
sanitization.column.foldLeft(outerAccumulator: DataFrame) { (innerAccumulator: DataFrame, elem: String) => innerAccumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null))) }
foldLeft
evaluates the last line (innerAccumulator.withColumn...
) that will be ininnerAccumulator:DataFrame
at the next iteration.In your 2nd example
sanitization._1.foreach{ elem => accumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null))) }
DataFrame
are immutable, sowithColumn
returns a newDataFrame
. But sincefor_each
returnsUnit
, the new DF created byaccumulator.withColumn
is lost