Home > Enterprise >  Scala - Spark : Operate on all the columns of DF at once and create a case-class for each column eff
Scala - Spark : Operate on all the columns of DF at once and create a case-class for each column eff

Time:08-14

I am facing a DataFrame with more than 50,000 rows, and around 30-100 columns. The data is numeric, and I wish to 'create' for each column a case-class with these attributes:

  • max value
  • min value
  • standard deviation

and save this data structure to a Map (col_name -> ColData case-class).

I will work with this toy-example to demonstrate:

// The desired case-class & desired map:
case class ColData(name: String, min: Double, max: Double, std_dev: Double) {}
var columnsData: Map[String, ColData] = Map.empty


// The toy example:
val df = Seq(
         (1, 20, 30, 4),
         (2, 30, 40, 5),
         (3, 10, 30, 2)
       ).toDF("col0", "col1", "col2", "col3")

 ---- ---- ---- ---- 
|col0|col1|col2|col3|
 ---- ---- ---- ---- 
|   1|  20|  30|   4|
|   2|  30|  40|   5|
|   3|  10|  30|   2|
 ---- ---- ---- ---- 
  1. that approach have already been tried:
df.columns.foreach(c => {
    var (min_c, max_c, std_dev_c) = df.select(min(c), max(c), stddev(c)).as[(Double, Double, Double)].first()
    columnsData = columnsData   (c -> ColData(c, min_c, max_c, std_dev_c))
})

But I experience it is slow and inefficient.



  1. I thought of that approach (that suppose to iterate over the DF less times) but could not tell if it is any better:
val min_cols = df.select(cols.map(min):_*).first().toSeq.toArray.map(_.toString.toDouble)
val max_cols = df.select(cols.map(max):_*).first().toSeq.toArray.map(_.toString.toDouble)
val std_cols = df.select(cols.map(stddev):_*).first().toSeq.toArray.map(_.toString.toDouble)

var seq = Seq(min_cols, max_cols, std_cols).transpose

df.columns.foreach(c => {
    var chosen = seq(df.columns.indexOf(c))
    var (min, max, std_dev) = (chosen(0), chosen(1), chosen(2))
    columnsData = columnsData   (c -> ColData(c, min, max, std_dev))
})

My questions:

  1. Are there any other ways to do it fast and efficient?
  2. Should I use cache / persist (before) & unpersist (after) to improve the performance? how should it be done?

Thank you!

CodePudding user response:

Your first approach is inefficient because you end up traversing your data frame multiple times (as many times as you have columns). Second one is a bit better, but still does three traversals.

Why not do everything in a single scan?

   df.select(
      df.columns.flatMap { c => Seq(min(c), max(c), stddev(c)) } :_*
   ).first
    .toSeq
    .grouped(3)
    .zip(df.columns.iterator)
    .collect { case (Seq(min: Number, max: Number, stddev: Number), col) => 
       col -> ColData(col, min.doubleValue, max.doubleValue, stddev.doubleValue) 
    }.toMap
       
  • Related