Home > Software engineering >  What is the best way to get different join output in pyspark?
What is the best way to get different join output in pyspark?

Time:12-08

I am wondering which one is most efficient in spark to get below 4 frames

  • df1 - left_anti
  • df2 - left_semi
  • df3 - right_anti
  • df4 - right_semi

Approach 1: (join - 1, filter - 4)

merged_df = left_df.join(right_df, join_condition, how='full_outer')
df1 = merged_df.filter(sf.col('right_df.col1').isNull()).select('left_df.*')
df2 = merged_df.filter(sf.col('right_df.col1').isNotNull()).select('left_df.*')
df3 = merged_df.filter(sf.col('left_df.col1').isNull()).select('right_df.*')
df4 = merged_df.filter(sf.col('left_df.col1').isNotNull()).select('right_df.*')

Approach 2: (join - 4, filter - 0)

df1 = left_df.join(right_df, join_condition, how='left_anti')
df2 = left_df.join(right_df, join_condition, how='left_semi')
df3 = left_df.join(right_df, join_condition, how='right_anti')
df4 = left_df.join(right_df, join_condition, how='right_semi')

where col1 is primary key column(non-nullable) in both the dataframes and

join_condition = (sf.col('left_df.col1') = sf.col('right_df.col1'))

Which of the above mentioned mechanisms is efficient?

Ref: https://medium.com/bild-journal/pyspark-joins-explained-9c4fba124839

CodePudding user response:

Before commenting on efficiency, just want to point out that generally speaking the df_n in both scenarios may not be identical:

>>> df1 = spark.createDataFrame([{'id1': 0, 'val1': "a"},{'id1': 1, 'val1': "b"},{'id1': None, 'val1': "df1"}])
>>> df2 = spark.createDataFrame([{'id2': 1, 'val2': "d"},{'id2': 2, 'val2': "e"},{'id2': None, 'val2': "df2"}])

>>> df1.show()
 ---- ---- 
| id1|val1|
 ---- ---- 
|   0|   a|
|   1|   b|
|null| df1|
 ---- ---- 

>>> df2.show()
 ---- ---- 
| id2|val2|
 ---- ---- 
|   1|   d|
|   2|   e|
|null| df2|
 ---- ---- 

>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").show()                                                                 
 ---- ---- ---- ---- 
| id1|val1| id2|val2|
 ---- ---- ---- ---- 
|   0|   a|null|null|
|null| df1|null|null|
|null|null|null| df2|
|   1|   b|   1|   d|
|null|null|   2|   e|
 ---- ---- ---- ---- 

>>> df1.join(df2, col("id1") == col("id2"), how="full_outer").filter(col('id2').isNull()).select(df1["*"]).show()
 ---- ---- 
| id1|val1|
 ---- ---- 
|   0|   a|
|null| df1|
|null|null|
 ---- ---- 

>>> df1.join(df2, col("id1") == col("id2"), how="left_anti").show()
 ---- ---- 
| id1|val1|
 ---- ---- 
|   0|   a|
|null| df1|
 ---- ---- 

>>> df1.join(df2, col('id1') == col('id2'), how='full_outer').filter(col('id2').isNotNull()).select(df1['*']).show()
 ---- ---- 
| id1|val1|
 ---- ---- 
|   1|   b|
|null|null|
 ---- ---- 

>>> df1.join(df2, col('id1') == col('id2'), how='left_semi').show()
 --- ---- 
|id1|val1|
 --- ---- 
|  1|   b|
 --- ---- 

This is, of course, because of how nulls are treated by SQL joins.

CodePudding user response:

[Posting my answer hoping it could be revised by a more experienced user]

I'd say It won't matter. Spark will reorganize these operations for optimization so if in the end result is the same, then the DAG (Directed Acyclic Graph) and the execution plan will be kind of the same.

If the objective is performance, then 1 join would be more conveniente because it can take advantage of a join broadcast (if the df at right is not too big and can be alocated in memory)

  • Related