I have a dataset with schema
root
|-- id: long (nullable = true)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
Say the dist is [(A, 10), (B, 5), (C, 3)]
, and I want to add a column [10/18, 5/18, 3/18]
which corresponds to the number divided by the sum.
How can I do that? Thank you all.
CodePudding user response:
Assuming you're starting with a Dataset of something like (updated to handle Option[Map[String, Long]]
):
import org.apache.spark.sql.Dataset
case class Model(id: Long, dist: Option[Map[String, Long]])
val ds: Dataset[Model] = List(
Model(1, Some(Map("a" -> 10, "b" -> 5, "c" -> 3))),
Model(2, Some(Map("x" -> 100, "y" -> 50, "z" -> 30))),
Model(3, None)
).toDS()
and your new column percentage
, which will need to be a String
column to display {x}/{y}
:
Spark >= 3.0.0
import org.apache.spark.sql.functions.{aggregate, col, format_string, lit, map_values, transform}
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.LongType
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", aggregate(col("values"), lit(0).cast(LongType), (acc, x) => acc x))
// format column with total
.withColumn("percentage", transform(col("values"), (c: Column) => format_string("%s/%s", c, col("total"))))
.drop("total", "values")
.show(false)
gives:
--- ---------------------------- -------------------------
|id |dist |percentage |
--- ---------------------------- -------------------------
|1 |{a -> 10, b -> 5, c -> 3} |[10/18, 5/18, 3/18] |
|2 |{x -> 100, y -> 50, z -> 30}|[100/180, 50/180, 30/180]|
|3 |null |null |
--- ---------------------------- -------------------------
which has a schema of:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: array (nullable = true)
| |-- element: string (containsNull = false)
Spark < 3.0.0 (Tested using Spark 2.4.5)
import org.apache.spark.sql.functions.{expr, col, map_values}
import org.apache.spark.sql.catalyst.expressions.aggregate._
val toDivisionUDF = udf(
(values: Seq[String], total: Long) =>
"[" values.map(v => s"$v/$total").mkString(", ") "]"
)
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", expr("aggregate(values, 0D, (acc, v) -> acc v)"))
// format column with total
.withColumn("percentage", toDivisionUDF(col("values"), col("total")))
.drop("total", "values")
.show(false)
gives:
--- ---------------------------- -------------------------
|id |dist |percentage |
--- ---------------------------- -------------------------
|1 |[a -> 10, b -> 5, c -> 3] |[10/18, 5/18, 3/18] |
|2 |[x -> 100, y -> 50, z -> 30]|[100/180, 50/180, 30/180]|
|3 |null |null |
--- ---------------------------- -------------------------
which has a schema of:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: string (nullable = true)
Note that the inclusion of Option
hasn't changed the code at all as Spark handles Option, treating the field inside the Option as nullable.