In my case I have a small DataFrame small_df
and a big DataFrame big_df
and I want to execute a join between them -
big_df.join(small_df, 'id', 'left_semi')
For making the process more efficient I want to broadcast the small_df
but I see there are two options -
1. big_df.join(pyspark.sql.functions.broadcast(small_df), 'id', 'left_semi')
2. big_df.join(sc.broadcast(small_df).value, 'id', 'left_semi')
What is the right way do to it? What is the difference between them?
Also, if I understand correctly, for using sc.broadcast
I need to call collect
, right? because otherwise, I'm getting an exception about using the SparkContext because it is not from the driver.
CodePudding user response:
From pyspark docu:
pyspark.sql.functions.broadcast(df)[source]
Marks a DataFrame as small enough for use in broadcast joins.
Docu for sc.broadcast
class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None, sock_file=None)[source]
A broadcast variable created with SparkContext.broadcast(). Access its value through value.
So the first one is more like a hint for Catalyst that this df should be joined with broadcast join even when normally algorith would preffer for example sort-merge join
If you want to know more about join hints here is sample comment from Spark code which describes this behaviour and other join hints
// If it is an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
// have the broadcast hints, choose the smaller side (based on stats) to broadcast.
// 2. sort merge hint: pick sort merge join if join keys are sortable.
// 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
// sides have the shuffle hash hints, choose the smaller side (based on stats) as the
// build side.
// 4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
// is supported. If both sides are small, choose the smaller side (based on stats)
// to broadcast.
// 2. Pick shuffle hash join if one side is small enough to build local hash map, and is
// much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
// 3. Pick sort merge join if the join keys are sortable.
// 4. Pick cartesian product if join type is inner like.
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
The second option creates broadcast variable and thats why you have to collect it (as it needs to be value and not dataframe). Whats interesting, when you use broadcast join your df also will be collected on driver, because thats how broadcasting is working right now in Spark
In your case you should use pyspark.sql.functions.broadcast(df). Use pyspark.Broadcast when you want to provide copy of some variable in each executors for other processing (for example if you want to have some small meta-data on each of them) and not during joins, for this its better to use hint