Home > Mobile >  Scala Spark Add a Column with percentage of a number over sum
Scala Spark Add a Column with percentage of a number over sum

Time:04-01

I have a dataset with schema

root
 |-- id: long (nullable = true)
 |-- dist: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)

Say the dist is [(A, 10), (B, 5), (C, 3)], and I want to add a column [10/18, 5/18, 3/18] which corresponds to the number divided by the sum.

How can I do that? Thank you all.

CodePudding user response:

Assuming you're starting with a Dataset of something like (updated to handle Option[Map[String, Long]]):

import org.apache.spark.sql.Dataset

case class Model(id: Long, dist: Option[Map[String, Long]])

val ds: Dataset[Model] = List(
  Model(1, Some(Map("a" -> 10, "b" -> 5, "c" -> 3))),
  Model(2, Some(Map("x" -> 100, "y" -> 50, "z" -> 30))),
  Model(3, None)
).toDS()

and your new column percentage, which will need to be a String column to display {x}/{y}:

Spark >= 3.0.0

import org.apache.spark.sql.functions.{aggregate, col, format_string, lit, map_values, transform}
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.LongType

ds
  // take just the values of the map `dist`
  .withColumn("values", map_values(col("dist")))
   // calculate the total per row
  .withColumn("total", aggregate(col("values"), lit(0).cast(LongType), (acc, x) => acc   x))
  // format column with total
  .withColumn("percentage", transform(col("values"), (c: Column) => format_string("%s/%s", c, col("total"))))
  .drop("total", "values")
  .show(false)

gives:

 --- ---------------------------- ------------------------- 
|id |dist                        |percentage               |
 --- ---------------------------- ------------------------- 
|1  |{a -> 10, b -> 5, c -> 3}   |[10/18, 5/18, 3/18]      |
|2  |{x -> 100, y -> 50, z -> 30}|[100/180, 50/180, 30/180]|
|3  |null                        |null                     |
 --- ---------------------------- ------------------------- 

which has a schema of:

root
 |-- id: long (nullable = false)
 |-- dist: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = false)
 |-- percentage: array (nullable = true)
 |    |-- element: string (containsNull = false)

Spark < 3.0.0 (Tested using Spark 2.4.5)

import org.apache.spark.sql.functions.{expr, col, map_values}
import org.apache.spark.sql.catalyst.expressions.aggregate._

val toDivisionUDF = udf(
  (values: Seq[String], total: Long) => 
    "["   values.map(v =>  s"$v/$total").mkString(", ")   "]"
)

ds
  // take just the values of the map `dist`
  .withColumn("values", map_values(col("dist")))
   // calculate the total per row
  .withColumn("total", expr("aggregate(values, 0D, (acc, v) -> acc   v)"))
  // format column with total
  .withColumn("percentage", toDivisionUDF(col("values"), col("total")))
  .drop("total", "values")
  .show(false)

gives:

 --- ---------------------------- ------------------------- 
|id |dist                        |percentage               |
 --- ---------------------------- ------------------------- 
|1  |[a -> 10, b -> 5, c -> 3]   |[10/18, 5/18, 3/18]      |
|2  |[x -> 100, y -> 50, z -> 30]|[100/180, 50/180, 30/180]|
|3  |null                        |null                     |
 --- ---------------------------- ------------------------- 

which has a schema of:

root
 |-- id: long (nullable = false)
 |-- dist: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = false)
 |-- percentage: string (nullable = true)

Note that the inclusion of Option hasn't changed the code at all as Spark handles Option, treating the field inside the Option as nullable.

  • Related