Home > Enterprise >  Spark Facade for CaseWhen
Spark Facade for CaseWhen

Time:11-17

I'm trying to make a function that acts like a Facade for CaseWhen spark function.

CaseWhen(
    branches: Seq[(Expression, Expression)],
    elseValue: Option[Expression] = None): ...

So, CaseWhen takes a Seq of (Expression, Expression), the first Expression is the condition and the second is the value. It also take an Option[Expression] for the else value.

Basically, I want to give a nested Map[Column, Column] object with the first Column being the condition and the second the value to put if this condition is satisfied.

I managed to do it for the most simple use case : Having only one level of (Condition, Value). Here's my code :

def withExpr(expr: Expression): Column = new Column(expr)

def caseWhenFacade(
       outputColName: String,
       conditionsValues: Map[Column, Column],
       defaultValue: Option[Column] = None
)(df: DataFrame): DataFrame = {
    require(conditionsValues.nonEmpty, "Cannot call caseWhenFacadewith an Empty Map")

    val conditionalMap =  conditionsValues.map(x => (x._1.expr, x._2.expr)).toSeq

    defaultValue match {
      case Some(value) => df.withColumn(
        colName,
        withExpr {CaseWhen(
            conditionalMap,
            value.expr
            )}
      )
      case None => df.withColumn(
        colName,
        withExpr {CaseWhen(
          conditionalMap
          )}
      )
  }
  }

Here's how i use that function:

val spec = Map(
    ($"column_one" === 1) -> lit(2),
    ($"column_one" === 2 && $"column_two" === 1) -> lit(1),
    ($"column_one" === 3) -> lit(4),
)
df.transform(caseWhenFacade("column_three", spec))

This works great, but Now, I want to give this spec Map :

val spec: Map[Column, Any] = Map(
    ($"column_one" === 1) -> Map(
        ($"column_two" === 2) -> lit(54),
        ($"column_two" === 5) -> lit(524)
    ),
    ($"column_one" === 2) -> Map(
        ($"column_two" === 7) -> Map(
            ($"whatever_column" === "whatever") -> lit(12),
            ($"whatever_column" === "whatever_two") -> lit(13)
        ),
        ($"column_two" === 8) -> lit(524)
    ),
    ($"column_one" === 3) -> lit(4)
)

And this obviously, doesn't work with my caseWhenFacade function. And I can't really figure out a way to do this.

Can anyone help me out ?

CodePudding user response:

Well that's because (as you know), an instance of Map[Column, Any] is not an instance of spark's Column. I can think of two approaches to this, one of them is an unsafe approach, which I'm not going to explain deeply, but to give you some idea about it, you can use conditionsValues: Map[Column, Any] in your caseWhenFacade, and try to inline the conditions into a single Map[Column, Column] using && on the keys. Another approach which I would recommend, is very similar to the first approach, but it is safe. You can make an ADT representing what your conditions and values look like:

sealed trait ConditionValue {
  def enumerate(conditionReduce: (Column, Column) => Column): List[(Column, Column)]
}

case class SingleConditionValue(condition: Column, value: Column) extends ConditionValue {
  override def enumerate(conditionReduce: (Column, Column) => Column): List[(Column, Column)] = 
    (condition -> value) :: Nil
}

case class NestedConditionValue(conditions: Map[Column, ConditionValue]) extends ConditionValue {
  def enumerate(conditionReduce: (Column, Column) => Column): List[(Column, Column)] = 
    conditions.mapValues(_.enumerate(conditionReduce)).map {
      case (outerCondition, innerExpressions) => 
        innerExpressions.map { case (innerCondition, innerValue) => 
          conditionReduce((outerCondition, innerCondition)) -> innerValue
        }
    }
}

Just to take a quick look at what this modeling means, in your nested Map object, which looks like this:

val spec: Map[Column, Any] = Map(
  ($"column_one" === 1) -> Map(
      ($"column_two" === 2) -> lit(54),
      ($"column_two" === 5) -> lit(524)
  )
)

Another way to express this map would be this:

val spec: Map[Column, Column] = Map(
  (($"column_one" === 1) && ($"column_two" === 2)) -> lit(54),
  (($"column_one" === 1) && ($"column_two" === 2)) -> lit(524)
)

Right? As you can see, we are reducing to conditions, using &&. So this is exactly what conditionReduce is meant to do in the enumerate method. And what I mean by reducing, is the reduction of that outer nested map into a single map, where nested conditions are reduced into one. At the end, your code would look something like this:

def caseWhenFacade(
  outputColName: String,
  conditionsValues: ConditionValue,
  defaultValue: Option[Column] = None
)(df: DataFrame): DataFrame = {
  require(conditionsValues.nonEmpty, "Cannot call caseWhenFacadewith an Empty Map")

  // this can be defined globally
  val and: (Column, Column) => Column = _ && _

  val conditionalMap: List[(Column, Column)] = conditionsValues.enumerate(and) // this meand do the 'and' operation on conditions

  defaultValue match {
    case Some(value) => 
      df.withColumn(
        colName,
        withExpr {CaseWhen(
          conditionalMap,
          value.expr
          )}
      )
    case None => 
      df.withColumn(
        colName,
        withExpr {CaseWhen(
          conditionalMap
        )}
      )
  }
}

One more thing to mention, which is obvious, is that you need to wrap your conditions and values, into instances of this ADT, or you can use implicit conversion to make the usage easier.

  • Related