I am trying to understand how broadcast join works along with repartition. Can we use both of broadcast and repartition together? Will it be more useful or it'll result in bad performance?
For example: lets say we have a big dataframe myDf
with 2 Billion rows and a small unpartitioned dataframe map_data
. The common column is matching_col
.
from pyspark.sql.functions import broadcast
# Perform Join with repartition before broadcast
myJoinedDf1 = myDf.join(broadcast(map_data.repartition('matching_col')), map_data.matching_col == myDf.matching_col)
# Perform Join with repartition after broadcast
myJoinedDf1 = myDf.join(broadcast(map_data).repartition('matching_col'), map_data.matching_col == myDf.matching_col)
CodePudding user response:
Until executor-side broadcast for broadcast join is implemented in Spark (JIRA-17556), there is probably no reason or real value in repartitioning a to-be-broadcasted dataframe. Because as stated in the above JIRA, "Currently in Spark SQL, in order to perform a broadcast join, the driver must collect the result of an RDD and then broadcast it."
What would make a lot of sense, and has a potential to improve overall performance, is repartitioning large (left-hand side) dataframe pre-join, to a number of partitions N
higher than the original:
# Perform Join with repartition before broadcast
myJoinedDf1 = myDf.repartition(N).join(broadcast(map_data), map_data.matching_col == myDf.matching_col)
This way, broadcast join will be performed over smaller, equally-sized, partitions, which may prove progressively more useful on more complex join conditions.