Home > database >  Scala : Irregular behaviour foldleft() vs foreach()
Scala : Irregular behaviour foldleft() vs foreach()

Time:10-08

Just listing out 2 solutions for which I had tried to achieve a use case to apply a spark udf to some of the columns, but I am not sure why both of my functions are behaving totally differently even when I am trying to achieve the same thing. Can someone explain the internal working, as of what exactly is happening in both of these cases?

Function 1:

def transformColumns(df: DataFrame, transformationType: String, sanitizationList: List[Sanitization]): DataFrame = {

    try {
      sanitizationList.foldLeft(df) {
        (outerAccumulator: DataFrame, sanitization: Sanitization) =>
          val aes: TAlgorithm = new AES256(key, iv)
          @transient lazy val udfFunction = udf(aes.decrypt(_)
          sanitization.column.foldLeft(outerAccumulator: DataFrame) {
            (innerAccumulator: DataFrame, elem: String) =>
              innerAccumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
          }
      }
    }

Function 2 :

def transformColumns(df: DataFrame, columns: Map[Seq[String], TAlgorithm]): DataFrame = {

    try {
      columns.foldLeft(df) {
        (accumulator: DataFrame, sanitization: (Seq[String], TAlgorithm)) =>
          import org.apache.spark.sql.functions.udf
          val aes: TAlgorithm = new AES256(key, iv)
          @transient lazy val udfFunction = udf(aes.decrypt(_))
          sanitization._1.foreach{
            elem => accumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
          }
          accumulator
      }
    }

In the second case none of columns are being transformed, not sure why.

CodePudding user response:

  • In your 1st example

    sanitization.column.foldLeft(outerAccumulator: DataFrame) {
      (innerAccumulator: DataFrame, elem: String) =>
        innerAccumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
    }
    

    foldLeft evaluates the last line (innerAccumulator.withColumn...) that will be in innerAccumulator:DataFrame at the next iteration.

  • In your 2nd example

    sanitization._1.foreach{
      elem => accumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
    }
    

    DataFrame are immutable, so withColumn returns a new DataFrame. But since for_each returns Unit, the new DF created by accumulator.withColumn is lost

  • Related