Home > OS >  Spark Dataframe with pivot and different aggregation, based on the column value (measure_type) - Sca
Spark Dataframe with pivot and different aggregation, based on the column value (measure_type) - Sca

Time:09-22

I have a spark dataframe of this type:

scala> val data = Seq((1, "k1", "measureA", 2), (1, "k1", "measureA", 4), (1, "k1", "measureB", 5), (1, "k1", "measureB", 7), (1, "k1", "measureC", 7), (1, "k1", "measureC", 1), (2, "k1", "measureB", 8), (2, "k1", "measureC", 9), (2, "k2", "measureA", 5), (2, "k2", "measureC", 5), (2, "k2", "measureC", 8))
data: Seq[(Int, String, String, Int)] = List((1,k1,measureA,2), (1,k1,measureA,4), (1,k1,measureB,5), (1,k1,measureB,7), (1,k1,measureC,7), (1,k1,measureC,1), (2,k1,measureB,8), (2,k1,measureC,9), (2,k2,measureA,5), (2,k2,measureC,5), (2,k2,measureC,8))

scala> val rdd = spark.sparkContext.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(Int, String, String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:27

scala> val df = rdd.toDF("ts","key","measure_type","value")
df: org.apache.spark.sql.DataFrame = [ts: int, key: string ... 2 more fields]

scala> df.show
 --- --- ------------ ----- 
| ts|key|measure_type|value|
 --- --- ------------ ----- 
|  1| k1|    measureA|    2|
|  1| k1|    measureA|    4|
|  1| k1|    measureB|    5|
|  1| k1|    measureB|    7|
|  1| k1|    measureC|    7|
|  1| k1|    measureC|    1|
|  2| k1|    measureB|    8|
|  2| k1|    measureC|    9|
|  2| k2|    measureA|    5|
|  2| k2|    measureC|    5|
|  2| k2|    measureC|    8|
 --- --- ------------ ----- 

I want to pivot on measure_type and apply different aggregation types to the value, depending on measure_type:

  • measureA -> sum
  • measureB -> avg
  • measureC -> max

Then, get the following output dataframe:

 --- --- -------- -------- -------- 
| ts|key|measureA|measureB|measureC|
 --- --- -------- -------- -------- 
|  1| k1|       6|       6|       7|
|  2| k1|    null|       8|       9|
|  2| k2|       5|    null|       8|
 --- --- -------- -------- -------- 

Thanks a lot.

CodePudding user response:

val ddf = df.groupBy("ts", "key").agg(
 sum(when(col("measure_type") === "measureA",col("value"))).as("measureA"),
 avg(when(col("measure_type") === "measureB",col("value"))).as("measureB"),
 max(when(col("measure_type") === "measureC",col("value"))).as("measureC"))

And results are

scala> ddf.show(false)
 --- --- -------- -------- -------- 
|ts |key|measureA|measureB|measureC|
 --- --- -------- -------- -------- 
|2  |k2 |5       |null    |8       |
|2  |k1 |null    |8.0     |9       |
|1  |k1 |6       |6.0     |7       |
 --- --- -------- -------- -------- 

CodePudding user response:

I think its tedious to do with traditional pivot function as it will only limit you to one particular aggregate function.

Here is what I would do by mapping a pre-defined list of aggregate functions that I need to perform and apply them on my dataframe giving me 3 extra columns for each aggregate functions and then create another column with value for the measure_type as you mentioned and then drop the 3 columns i created in previous step

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import spark.implicits._

val df = Seq((1, "k1", "measureA", 2), (1, "k1", "measureA", 4), (1, "k1", "measureB", 5), (1, "k1", "measureB", 7), (1, "k1", "measureC", 7), (1, "k1", "measureC", 1), (2, "k1", "measureB", 8), (2, "k1", "measureC", 9), (2, "k2", "measureA", 5), (2, "k2", "measureC", 5), (2, "k2", "measureC", 8)).toDF("ts","key","measure_type","value")
val mapping: Map[String, Column => Column] = Map(
  "sum" -> sum, "avg" -> avg, "max" -> max)

val groupBy = Seq("ts","key","measure_type")
val aggregate = Seq("value")
val operations = Seq("sum", "avg", "max")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

val df2 = df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*)

val df3 = df2.withColumn("new_column", 
           when($"measure_type" === "measureA", $"sum(value)")
           .when($"measure_type" === "measureB", $"avg(value)")
            .otherwise($"max(value)"))
          .drop("sum(value)")
          .drop("avg(value)")
          .drop("max(value)")

df3 is the dataframe that you need.

  • Related