Suppose I have two different windows with the same partitioning:
window1 = Window.partitionBy("id")
window2 = Window.partitionBy("id").orderBy("date")
And then I call several consecutive window functions using them:
df.withColumn("col1", F.sum("x").over(window1))
.withColumn("col2", F.first("x").over(window2))
And suppose df
is not partitioned by id
.
Will the computation of col2
cause another shuffle or will it reuse the same partitioning?
Does adding
df.repartition("id")
before the computation cause any performance improvement?
CodePudding user response:
TLDR: only one shuffle occurs, repartition
is useless here.
That's actually quite easy to verify.
// sample data
df = spark.createDataFrame([
(1, 2, "2022-10-22"),
(1, 3, "2022-11-22"),
(2, 4, "2023-12-12"),
(2, 5, "2021-01-01")], ['id', 'x', 'date'])
# let us now introduce 3 windows and see what happens:
window1 = Window.partitionBy("id")
window2 = Window.partitionBy("id").orderBy("date")
window3 = Window.partitionBy("x").orderBy("date")
Now, let us use explain
on your code:
df.withColumn("col1", f.sum("x").over(window1))\
.withColumn("col2", f.first("x").over(window2))\
.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Window [first(x#169L, false) windowspecdefinition(id#168L, date#170 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col2#233L], [id#168L], [date#170 ASC NULLS FIRST]
- Sort [id#168L ASC NULLS FIRST, date#170 ASC NULLS FIRST], false, 0
- Window [sum(x#169L) windowspecdefinition(id#168L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col1#227L], [id#168L]
- Sort [id#168L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(id#168L, 200), ENSURE_REQUIREMENTS, [id=#164]
- Scan ExistingRDD[id#168L,x#169L,date#170
As you can see, only one Exchange (=shuffle). Adding repartition
yields the exact same execution plan. No change at all:
df.repartition("id")\
.withColumn("col1", f.sum("x").over(window1))\
.withColumn("col2", f.first("x").over(window2))\
.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Window [first(x#169L, false) windowspecdefinition(id#168L, date#170 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col2#246L], [id#168L], [date#170 ASC NULLS FIRST]
- Sort [id#168L ASC NULLS FIRST, date#170 ASC NULLS FIRST], false, 0
- Window [sum(x#169L) windowspecdefinition(id#168L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col1#240L], [id#168L]
- Sort [id#168L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(id#168L, 200), REPARTITION_BY_COL, [id=#182]
- Scan ExistingRDD[id#168L,x#169L,date#170]
The takeaway here is that regardless of the way it does it, spark remembers that it has partitioned the data and how it did it to avoid having to do it again.
Finally, notice that with window3
, that needs a different partitioning, we have two shuffles:
df.withColumn("col1", f.sum("x").over(window1))\
.withColumn("col2", f.first("id").over(window3))\
.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Window [first(id#168L, false) windowspecdefinition(x#169L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col2#259L], [x#169L]
- Sort [x#169L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(x#169L, 200), ENSURE_REQUIREMENTS, [id=#207]
- Window [sum(x#169L) windowspecdefinition(id#168L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col1#253L], [id#168L]
- Sort [id#168L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(id#168L, 200), ENSURE_REQUIREMENTS, [id=#203]
- Scan ExistingRDD[id#168L,x#169L,date#170