I have a spark dataframe of this type:
scala> val data = Seq((1, "k1", "measureA", 2), (1, "k1", "measureA", 4), (1, "k1", "measureB", 5), (1, "k1", "measureB", 7), (1, "k1", "measureC", 7), (1, "k1", "measureC", 1), (2, "k1", "measureB", 8), (2, "k1", "measureC", 9), (2, "k2", "measureA", 5), (2, "k2", "measureC", 5), (2, "k2", "measureC", 8))
data: Seq[(Int, String, String, Int)] = List((1,k1,measureA,2), (1,k1,measureA,4), (1,k1,measureB,5), (1,k1,measureB,7), (1,k1,measureC,7), (1,k1,measureC,1), (2,k1,measureB,8), (2,k1,measureC,9), (2,k2,measureA,5), (2,k2,measureC,5), (2,k2,measureC,8))
scala> val rdd = spark.sparkContext.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(Int, String, String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:27
scala> val df = rdd.toDF("ts","key","measure_type","value")
df: org.apache.spark.sql.DataFrame = [ts: int, key: string ... 2 more fields]
scala> df.show
--- --- ------------ -----
| ts|key|measure_type|value|
--- --- ------------ -----
| 1| k1| measureA| 2|
| 1| k1| measureA| 4|
| 1| k1| measureB| 5|
| 1| k1| measureB| 7|
| 1| k1| measureC| 7|
| 1| k1| measureC| 1|
| 2| k1| measureB| 8|
| 2| k1| measureC| 9|
| 2| k2| measureA| 5|
| 2| k2| measureC| 5|
| 2| k2| measureC| 8|
--- --- ------------ -----
I want to pivot on measure_type and apply different aggregation types to the value, depending on measure_type:
- measureA -> sum
- measureB -> avg
- measureC -> max
Then, get the following output dataframe:
--- --- -------- -------- --------
| ts|key|measureA|measureB|measureC|
--- --- -------- -------- --------
| 1| k1| 6| 6| 7|
| 2| k1| null| 8| 9|
| 2| k2| 5| null| 8|
--- --- -------- -------- --------
Thanks a lot.
CodePudding user response:
val ddf = df.groupBy("ts", "key").agg(
sum(when(col("measure_type") === "measureA",col("value"))).as("measureA"),
avg(when(col("measure_type") === "measureB",col("value"))).as("measureB"),
max(when(col("measure_type") === "measureC",col("value"))).as("measureC"))
And results are
scala> ddf.show(false)
--- --- -------- -------- --------
|ts |key|measureA|measureB|measureC|
--- --- -------- -------- --------
|2 |k2 |5 |null |8 |
|2 |k1 |null |8.0 |9 |
|1 |k1 |6 |6.0 |7 |
--- --- -------- -------- --------
CodePudding user response:
I think its tedious to do with traditional pivot
function as it will only limit you to one particular aggregate function.
Here is what I would do by mapping a pre-defined list of aggregate functions that I need to perform and apply them on my dataframe giving me 3 extra columns for each aggregate functions and then create another column with value for the measure_type as you mentioned and then drop the 3 columns i created in previous step
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import spark.implicits._
val df = Seq((1, "k1", "measureA", 2), (1, "k1", "measureA", 4), (1, "k1", "measureB", 5), (1, "k1", "measureB", 7), (1, "k1", "measureC", 7), (1, "k1", "measureC", 1), (2, "k1", "measureB", 8), (2, "k1", "measureC", 9), (2, "k2", "measureA", 5), (2, "k2", "measureC", 5), (2, "k2", "measureC", 8)).toDF("ts","key","measure_type","value")
val mapping: Map[String, Column => Column] = Map(
"sum" -> sum, "avg" -> avg, "max" -> max)
val groupBy = Seq("ts","key","measure_type")
val aggregate = Seq("value")
val operations = Seq("sum", "avg", "max")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))
val df2 = df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*)
val df3 = df2.withColumn("new_column",
when($"measure_type" === "measureA", $"sum(value)")
.when($"measure_type" === "measureB", $"avg(value)")
.otherwise($"max(value)"))
.drop("sum(value)")
.drop("avg(value)")
.drop("max(value)")
df3 is the dataframe that you need.