Home > Net >  spark scala groupby without explode
spark scala groupby without explode

Time:07-26

I have DataFrame:

col1: String
col2: String
col3: Array[String]
col4: Long

I do:

df
  .withColumn("col3", explode(col("col3"))
  .groupBy("col1", "col2", "col3")
  .agg(
    sum("col4").alias("col4")
  )

PROBLEM: explode blows out the executors memory and I want to avoid adding bigger and more executors as I am already using 0.5TB of memory in total and ~350cpus.

How can I do groupBy col1, col2, col3 without explode?

I was thinking about splitting df into two:

df1:
col1: String
col2: Stirng
col3: Array[String]
group_id: Long <--- monotonically_increasing_id

df2:
group_id: Long
col4: Long

and somehow do collect_set on col3 together with information about group_id and after all link group_id information between df1 and df2 but I am not sure if it makes sense at all.

How can I solve it?

CodePudding user response:

I don't think you can avoid having an explode, as at a point in your algorithm you need to transform an array of string to a string.

However, you can groupBy over the two first columns, using an User-Defined Aggregate Function to compute a map representing the different values in col3 and their col4 sum, and then explode this map into col3 and sum of col4.

By doing so, memory usage should not explode, as at any point of the algorithm you don't duplicate values in col1, col2 and col4.

Here is the complete code. First you define your custom aggregator

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder

case class Record(col3: Seq[String], col4: Int)

case class Output(col3: String, col4: Int)

object ValuesMap extends Aggregator[Record, mutable.Map[String, Int], Seq[Output]] {

  def zero: mutable.Map[String, Int] = mutable.Map()

  def reduce(values: mutable.Map[String, Int], currentRecord: Record): mutable.Map[String, Int] = {
    currentRecord.col3.foreach(
      element => values.put(element, values.getOrElse(element, 0)   currentRecord.col4)
    )
    values
  }

  def merge(values1: mutable.Map[String, Int], values2: mutable.Map[String, Int]): mutable.Map[String, Int] = {
    values1.foreach(
      element => values2.put(element._1, values2.getOrElse(element._1, 0)   element._2)
    )
    values2
  }

  def finish(reduction: mutable.Map[String, Int]): Seq[Output] = {
    reduction.map(element => Output(element._1, element._2)).toSeq
  }


  def bufferEncoder: Encoder[mutable.Map[String, Int]] = ExpressionEncoder[mutable.Map[String, Int]]

  def outputEncoder: Encoder[Seq[Output]] = ExpressionEncoder[Seq[Output]]

}

And then you call your aggregator as an user-defined aggregate function with udaf method:

import org.apache.spark.sql.functions.{col, explode, udaf}

val values_map = udaf(ValuesMap)

val result = df.groupBy("col1", "col2")
  .agg(values_map(col("col3"), col("col4")).alias("values"))
  .withColumn("value", explode(col("values")))
  .select(
    "col1",
    "col2",
    "value.col3",
    "value.col4"
  )

If you have the following input dataframe:

 ---- ---- --------- ---- 
|col1|col2|col3     |col4|
 ---- ---- --------- ---- 
|1   |a   |[x, y]   |1   |
|1   |b   |[y]      |10  |
|1   |a   |[y, z]   |100 |
|2   |b   |[x, y, z]|1000|
 ---- ---- --------- ---- 

you will get the following result dataframe:

 ---- ---- ---- ---- 
|col1|col2|col3|col4|
 ---- ---- ---- ---- 
|1   |a   |z   |100 |
|1   |a   |y   |101 |
|1   |a   |x   |1   |
|1   |b   |y   |10  |
|2   |b   |z   |1000|
|2   |b   |y   |1000|
|2   |b   |x   |1000|
 ---- ---- ---- ---- 

CodePudding user response:

You can try using: spark.sql.adaptive.enabled if you are on spark 3.0. It's purpose to help with Skew joins, I'm not sure if it will help with explodes but it would be something worth trying.

If that doesn't work you could try breaking up your explode to ease the pressure on memory. (Similar to what you might do for skew before adaptive query was availble.) This

This code is all hand written and untested but give you the concept idea of what I'm suggesting. Expressions should work in both Spark2/3.

df
  .withColumn("col3_size", size(col("col3")) #helper
  .withColumn("col3_step",floor(col3_size/4)) #helper
  .withColumn("col3_a", expr(" explode(slice(col3,0,col3_step)) ") #break of a small piece
  .withColumn("col3_b", expr(" explode(slice(col3,col3_step 1,col3_step)  "))
  .withColumn("col3_c",  expr(" explode(slice(col3,col3_step*2 1,col3_step)  "))
  .withColumn("col3_d", expr(" explode(slice(col3,(col3_step*3 1),((col3_step*3)-col3_size))  ")) #funky math to ensure we get it all
  .drop($"col3")
  .withColumn("col3", coalesce( $"col3_a",$"col3_b",$"col3_c",$"col3_d" ) ) #combine all the info back together.
  .drop( $"col3_size", $"col3_step", $"col3_a", $"col3_b", $"col3_c", $"col3_d") #clean up after.
  • Related