Home > other >  UPDATE a column value using JOIN and multiple WHERE conditions in PySpark
UPDATE a column value using JOIN and multiple WHERE conditions in PySpark

Time:11-12

I have a SQL query which I am trying to transform into PySpark which have some joins and multiple where conditions:

UPDATE COMPANY1
INNER JOIN COMPANY2
ON COMPANY1.C1_PROFIT = COMPANY2.C2_PROFIT 
SET COMPANY2.C2_TARGET = "1"
WHERE (((COMPANY2.C2_TARGET) Is Null)
  AND ((COMPANY1.C1_SALES) Is Null)
  AND ((COMPANY2.C2_PROFIT) Is Not Null));

PySpark query I am trying to execute (df_1->COMPANY2 & df_2->Company1):

join = ((df_1.C2_PROFIT == df_2.C1_PROFIT) & \
  (df_1.C2_TARGET=='') & \
  (df_2.C1_SALES=='') & \
  (df_1.C2_PROFIT!=''))
df_1 = (df_1.alias('a')
  .join(df_2.alias('b'), join, 'left')
  .select(
    *[c for c in df_1.columns if c != 'C2_TARGET'],
    F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
  )
)

But I am still getting null values in column "C2_TARGET".

For the information: column "C1_Profit" is null-free, but in "C2_Profit" we sometimes have null as well as values.

Example inputs:

 ------------------ -------------- 
|  C1_PROFIT       |C1_SALES      |
 ------------------ -------------- 
|5637              |     Positive |
|7464              |              |
|43645             |              |
|64657             |      Growth P|
 ------------------ -------------- 

 ------------------ -------------- 
|  C2_PROFIT       |C2_TARGET     |
 ------------------ -------------- 
|                  |              |
|7464              |              |
|43645             |              |
|64657             |              |
 ------------------ -------------- 

Expected result:

enter image description here

CodePudding user response:

based on the query, it seems like a case when operation that assigns the value "1" wherever the conditions are met. this can be replicated using a when().otherwise().

assuming company1 and company2 are the 2 dataframes

final_sdf = company1. \
    join(company2, company1.c1_profit = company2.c2_profit, 'inner'). \
    withColumn('c2_target', 
               func.when(company2.c2_target.isNull() & 
                         company1.c1_sales.isNull() & 
                         company2.c2_profit.isNotNull(), func.lit("1")
                         ).
               otherwise(company2.c2_target)
               )

CodePudding user response:

In this answer, you have an example of how to do

UPDATE A INNER JOIN B
...
SET A...

In your case, you SET B...:

UPDATE A INNER JOIN B
...
SET B...

You have correctly switched the order of your dataframes.

What's not correct is that '' is not the same as null. You must use .isNull() and .isNotNull() in your conditions.


Example inputs:

from pyspark.sql import functions as F
df_1 = spark.createDataFrame(
    [(5637, 'Positive'),
     (7464, None),
     (43645, None),
     (64657, 'Growth P')],
    ['C1_PROFIT', 'C1_SALES'])

df_2 = spark.createDataFrame(
    [(None, None),
     (7464, None),
     (43645, None),
     (64657, None)],
    'C2_PROFIT int, C2_TARGET string')

Script:

join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \
          df_2.C2_TARGET.isNull() & \
          df_1.C1_SALES.isNull() & \
          df_2.C2_PROFIT.isNotNull()
df = (df_2.alias('a')
    .join(df_1.alias('b'), join_on, 'left')
    .select(
        *[c for c in df_2.columns if c != 'C2_TARGET'],
        F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
    )
)

df.show()
#  --------- --------- 
# |C2_PROFIT|C2_TARGET|
#  --------- --------- 
# |     null|     null|
# |     7464|        1|
# |    64657|     null|
# |    43645|        1|
#  --------- --------- 
  • Related