I want to know if a Window used x times will perform x times shuffle of the data.
Example :
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
Will this code perform 1 shuffle of the data because there's 1 Window ? Or 2 shuffle of the data because the Window is used twice ?
If the answer is 2 shuffle, would a repartitioning by col_a reduce the amount of shuffle to 1 like in below code example ?
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy(col_a).orderBy('date')
df = df.repartition('col_a')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
CodePudding user response:
If we display how spark will compute this dataframe with explain
, we get the following execution plan:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19, row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#23]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]
As you can see, there is only one Exchange
(meaning one shuffle) step. So there is only one shuffle if you reuse your window to compute several columns, if there is no shuffle between those computation. Moreover, there is only one Window
step, meaning that the two columns using window are actually computed during the same step and not one after other.
Others cases
If we repartition by col_a
before computing columns windows, the execution plan is the same than without repartition:
w = Window.partitionBy('col_a').orderBy('date')
df = df.repartition('col_a')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19, row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), REPARTITION, [id=#26]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]
If we repartition by col_a
between the two column computations that use window, the two columns are no longer computed in the same step:
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.repartition('col_a')
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#33]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]
if we repartition by col_b
between the two window column computations, we get 3 shuffles. So using the same window trigger one shuffle only if there is no repartition/shuffle using other columns between window column computations:
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.repartition('col_b')
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(3) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#42]
# - Exchange hashpartitioning(col_b#2, 200), REPARTITION, [id=#41]
# - Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#36]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]