Home > Enterprise >  How to calculate the ApproxQuanitiles from list of Integers into Spark DataFrame column using scala
How to calculate the ApproxQuanitiles from list of Integers into Spark DataFrame column using scala

Time:10-20

I have a spark DataFrame with a column containing several arrays of Integers with varying lengths. I will need to create a new column to find the Quantiles for each of these. This is the input DataFrame :

 --------- ------------------------ 
|Comm     |List_Nb_total_operations|
 --------- ------------------------ 
|    comm1|         [1, 1, 2, 3, 4]|
|    comm4|                  [2, 2]|
|    comm3|                  [2, 2]|
|    comm0| [1, 1, 1, 2, 2, 2, 3,3]|
|    comm2|         [1, 1, 1, 2, 3]|
 --------- ------------------------ 

This is the desired result :

 --------- ------------------------ ---- ---- 
|Comm     |List_Nb_total_operations|QT25|QT75|
 --------- ------------------------ ---- ---- 
|    comm1|         [1, 1, 2, 3, 4]|   1|   3|
|    comm4|                  [2, 2]|   2|   2|
|    comm3|                  [2, 2]|   2|   2|
|    comm0| [1, 1, 1, 2, 2, 2, 3,3]|   1|   3|
|    comm2|         [1, 1, 1, 2, 3]|   1|   2|
 --------- ------------------------ ---- ---- 

CodePudding user response:

The function you want to use is percentile_approx (since Spark 3.1):

val df = Seq(
  ("comm1", Seq(1,1,2,3,4)),
  ("comm4", Seq(2,2)),
  ("comm3", Seq(2,2)),
  ("comm0", Seq(1,1,1,2,2,2,3,3)),
  ("comm2", Seq(1,1,1,2,3))
).toDF("Comm", "ops")

val dfQ = df.select(
  col("Comm"),
  explode(col("ops")) as "ops")
  .groupBy("Comm")
  .agg(
    percentile_approx($"ops", lit(0.25), lit(100)) as "q25",
    percentile_approx($"ops", lit(0.75), lit(100)) as "q75"
  )

val dfWithQ = df.join(dfQ, Seq("Comm"))

The documentation has more information regarding tuning the parameters for accuracy.

CodePudding user response:

Thank you for your help. I've found an other solution that works very well in my case:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile

def percentile_approxx(col: Column, percentage: Column, accuracy: Column): Column = {
  val expr = new ApproximatePercentile(
      col.expr,  percentage.expr, accuracy.expr
  ).toAggregateExpression
  new Column(expr)
}
val perc_df = df.groupBy("Comm").agg(percentile_approxx(col("ops"), lit(0.75), lit(100)))
  • Related