Home > Mobile >  Scope DataFrame transformations in Spark
Scope DataFrame transformations in Spark

Time:10-08

I need to transform some DataFrame rows for which specific flag is set and leave all other rows untouched.

df.withColumn("a", when($"flag".isNotNull, lit(1)).otherwise($"a"))
  .withColumn("b", when($"flag".isNotNull, "$b"   1).otherwise($"b"))
  .withColumn("c", when($"flag".isNotNull, concat($"c", "  ")).otherwise($"c"))

There might be more columns like that and I am looking for a way to refactor this into something nicer.

I thought about:

df.filter($"flag".isNotNull)
  .withColumn("a", lit(1))
  .withColumn("b", $"b"   1)
  .withColumn("c", concat($"c", "  "))
  .union(df.filter($"flag".isNull))

but it scans/recalculates df twice. Even if I cache it, the plan contains lineage of each branch separately - and I actually chain multiple similar transformations, so final plan explodes expotentially and crashes.

Would it be possible to implement something like:

df.withScope($"flag".isNotNull) { scoped =>
  scoped.withColumn("a", lit(1))
        .withColumn("b", $"b"   1)
        .withColumn("c", concat($"c", "  "))
}

CodePudding user response:

Using when expressions is ok. You can write something like this:

val updates = Map(
  "a" -> lit(1),
  "b" -> $"b"   1,
  "c" -> concat($"c", "  ")
)

val df2 = updates.foldLeft(df) { case (acc, (c, v)) =>
  acc.withColumn(c, when($"flag".isNotNull, v).otherwise(col(c)))
}
  • Related