Can someone explain with a practical example how ENSURE_REQUIREMENTS is effected?
Reading on this topic is not really clear.
I looked here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala but not sure what to make of it. Some sort of insurance by Spark that things go well? I find the documentation cryptic.
You can refer to another SO question of mine: Spark JOIN on 2 DF's with same Partitioner in 2.4.5 vs 3.1.2 appears to differ in approach, unfavourably for newer version. There I experimented but do not get the gist of why this is occuring.
None of my colleagies can explain it either.
CodePudding user response:
Lets assume we want to find out how weather affects tourist visits to Acadia National Park:
scala> spark.sql("SET spark.sql.shuffle.partitions=10")
scala> val ds = spark.sql("SELECT Date, AVG(VisitDuration) AvgVisitDuration FROM visits GROUP BY Date")
scala> ds.createOrReplaceTempView("visit_stats")
scala> val dwv = spark.sql("SELECT /* MERGEJOIN(v) */ w.*, v.AvgVisitDuration FROM weather w JOIN visit_stats v ON w.Date = v.Date")
scala> dwv.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Project [Date#91, MaxTemp#92, AverageTemp#93, MinTemp#94, Precip#95, Conditions#96, AvgVisitDuration#216]
- SortMergeJoin [Date#91], [Date#27], Inner
:- Sort [Date#91 ASC NULLS FIRST], false, 0
: - Exchange hashpartitioning(Date#91, 10), ENSURE_REQUIREMENTS, [id=#478]
: - Filter isnotnull(Date#91)
: - FileScan ...
- Sort [Date#27 ASC NULLS FIRST], false, 0
- HashAggregate(keys=[Date#27], functions=[avg(cast(VisitDuration#31 as double))])
- Exchange hashpartitioning(Date#27, 10), ENSURE_REQUIREMENTS, [id=#474]
- HashAggregate(keys=[Date#27], functions=[partial_avg(cast(VisitDuration#31 as double))])
- Filter isnotnull(Date#27)
- FileScan ...
Worth noting that a) Spark decided to shuffle both datasets using 10 partitions to calculate the average and to perform a join, both, and that b) shuffle origin in both cases is ENSURE_REQUIREMENTS
.
Now lets say visits dataset is quite large, so we want to increase parallelism of our stats calculations and we repartitioned it to a higher number.
scala> val dvr = dv.repartition(100,col("Date"))
scala> dvr.createOrReplaceTempView("visits_rep")
scala> val ds = spark.sql("SELECT Date, AVG(AvgDuration) AvgVisitDuration FROM visits_rep GROUP BY Date")
scala> ds.createOrReplaceTempView("visit_stats")
scala> val dwv = spark.sql("SELECT /* MERGEJOIN(v) */ w.*, v.AvgVisitDuration from weather w join visit_stats v on w.Date = v.Date")
scala> dwv.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Project [Date#91, MaxTemp#92, AverageTemp#93, MinTemp#94, Precip#95, Conditions#96, AvgVisitDuration#231]
- SortMergeJoin [Date#91], [Date#27], Inner
:- Sort [Date#91 ASC NULLS FIRST], false, 0
: - Exchange hashpartitioning(Date#91, 100), ENSURE_REQUIREMENTS, [id=#531]
: - Filter isnotnull(Date#91)
: - FileScan ...
- Sort [Date#27 ASC NULLS FIRST], false, 0
- HashAggregate(keys=[Date#27], functions=[avg(cast(VisitDuration#31 as double))])
- HashAggregate(keys=[Date#27], functions=[partial_avg(cast(VisitDuration#31 as double))])
- Exchange hashpartitioning(Date#27, 100), REPARTITION_BY_NUM, [id=#524]
- Filter isnotnull(Date#27)
- FileScan ...
Here, REPARTITION_BY_NUM
shuffle origin dictated the need to have 100 partitions, so Spark optimized the other, ENSURE_REQUIREMENTS
, origin to also use a hundred. Thus eliminating the need for another shuffle.
This is just one simple case, but I'm sure there are many other optimizations Spark can apply to DAG that contain shuffles with ENSURE_REQUIREMENTS
origin.