I am wondering which one is most efficient in spark to get below 4 frames
- df1 - left_anti
- df2 - left_semi
- df3 - right_anti
- df4 - right_semi
Approach 1: (join - 1, filter - 4)
merged_df = left_df.join(right_df, join_condition, how='full_outer')
df1 = merged_df.filter(sf.col('right_df.col1').isNull()).select('left_df.*')
df2 = merged_df.filter(sf.col('right_df.col1').isNotNull()).select('left_df.*')
df3 = merged_df.filter(sf.col('left_df.col1').isNull()).select('right_df.*')
df4 = merged_df.filter(sf.col('left_df.col1').isNotNull()).select('right_df.*')
Approach 2: (join - 4, filter - 0)
df1 = left_df.join(right_df, join_condition, how='left_anti')
df2 = left_df.join(right_df, join_condition, how='left_semi')
df3 = left_df.join(right_df, join_condition, how='right_anti')
df4 = left_df.join(right_df, join_condition, how='right_semi')
where col1
is primary key column(non-nullable) in both the dataframes and
join_condition = (sf.col('left_df.col1') = sf.col('right_df.col1'))
Which of the above mentioned mechanisms is efficient?
Ref: https://medium.com/bild-journal/pyspark-joins-explained-9c4fba124839
CodePudding user response:
Before commenting on efficiency, just want to point out that generally speaking the df_n
in both scenarios may not be identical:
>>> df1 = spark.createDataFrame([{'id1': 0, 'val1': "a"},{'id1': 1, 'val1': "b"},{'id1': None, 'val1': "df1"}])
>>> df2 = spark.createDataFrame([{'id2': 1, 'val2': "d"},{'id2': 2, 'val2': "e"},{'id2': None, 'val2': "df2"}])
>>> df1.show()
---- ----
| id1|val1|
---- ----
| 0| a|
| 1| b|
|null| df1|
---- ----
>>> df2.show()
---- ----
| id2|val2|
---- ----
| 1| d|
| 2| e|
|null| df2|
---- ----
>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").show()
---- ---- ---- ----
| id1|val1| id2|val2|
---- ---- ---- ----
| 0| a|null|null|
|null| df1|null|null|
|null|null|null| df2|
| 1| b| 1| d|
|null|null| 2| e|
---- ---- ---- ----
>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").filter(col('id2').isNull()).select(df1["*"]).show()
---- ----
| id1|val1|
---- ----
| 0| a|
|null| df1|
|null|null|
---- ----
>>> df1.join(df2, col("id1") == col("id2"), how="left_anti").show()
---- ----
| id1|val1|
---- ----
| 0| a|
|null| df1|
---- ----
>>> df1.join(df2, col('id1') == col('id2'), how='full_outer').filter(col('id2').isNotNull()).select(df1['*']).show()
---- ----
| id1|val1|
---- ----
| 1| b|
|null|null|
---- ----
>>> df1.join(df2, col('id1') == col('id2'), how='left_semi').show()
--- ----
|id1|val1|
--- ----
| 1| b|
--- ----
This is, of course, because of how null
s are treated by SQL joins.
CodePudding user response:
[Posting my answer hoping it could be revised by a more experienced user]
I'd say It won't matter. Spark will reorganize these operations for optimization so if in the end result is the same, then the DAG (Directed Acyclic Graph) and the execution plan will be kind of the same.
If the objective is performance, then 1 join would be more conveniente because it can take advantage of a join broadcast (if the df at right is not too big and can be alocated in memory)