Home > Mobile >  Conditional calculation with two datasets - PySpark
Conditional calculation with two datasets - PySpark

Time:02-15

Imagine you have two datasets df and df2 like the following:

df:

ID Size Condition
1   2      1
2   3      0
3   5      0 
4   7      1

df2:

 aux_ID Scalar
  1       2
  3       2

I want to get an output where if the condition of df is 1, we multiply the size times the scalar and then return df with the changed values.

I would want to do this as efficient as possible, perhaps avoiding the join if that's possible.

output_df:

ID Size Condition
1   4      1
2   3      0
3   5      0 
4   7      1

CodePudding user response:

Not sure why would you want to avoid Joins in the first place. They can be efficient in there own regards.

With this said , this can be easily done with Merging the 2 datasets and building a case-when statement against the condition

Data Preparation

df1 = pd.read_csv(StringIO("""ID,Size,Condition
                1,2,1
                2,3,0
                3,5,0 
                4,7,1
        """)
    ,delimiter=','
)

    
df2 = pd.read_csv(StringIO("""aux_ID,Scalar
  1,2
  3,2
  """)
    ,delimiter=','
)


sparkDF1 = sql.createDataFrame(df1)
sparkDF2 = sql.createDataFrame(df2)

sparkDF1.show()

 --- ---- --------- 
| ID|Size|Condition|
 --- ---- --------- 
|  1|   2|        1|
|  2|   3|        0|
|  3|   5|        0|
|  4|   7|        1|
 --- ---- --------- 

sparkDF2.show()

 ------ ------ 
|aux_ID|Scalar|
 ------ ------ 
|     1|     2|
|     3|     2|
 ------ ------ 

Case When

finalDF = sparkDF1.join(sparkDF2
                       ,sparkDF1['ID'] == sparkDF2['aux_ID']
                       ,'left'
          ).select(sparkDF1['*']
                   ,sparkDF2['Scalar']
                   ,sparkDF2['aux_ID']
          ).withColumn('Size_Changed',F.when( ( F.col('Condition') == 1 )
                                            & ( F.col('aux_ID').isNotNull())
                                         ,F.col('Size') * F.col('Scalar')
                                    ).otherwise(F.col('Size')
                        )
          )

finalDF.show()

 --- ---- --------- ------ ------ ------------ 
| ID|Size|Condition|Scalar|aux_ID|Size_Changed|
 --- ---- --------- ------ ------ ------------ 
|  1|   2|        1|     2|     1|           4|
|  3|   5|        0|     2|     3|           5|
|  2|   3|        0|  null|  null|           3|
|  4|   7|        1|  null|  null|           7|
 --- ---- --------- ------ ------ ------------ 

You can drop the unnecessary columns , I kept them for your illustration

  • Related