I have a SQL query which I am trying to transform into PySpark which have some join
s and multiple where
conditions:
UPDATE COMPANY1
INNER JOIN COMPANY2
ON COMPANY1.C1_PROFIT = COMPANY2.C2_PROFIT
SET COMPANY2.C2_TARGET = "1"
WHERE (((COMPANY2.C2_TARGET) Is Null)
AND ((COMPANY1.C1_SALES) Is Null)
AND ((COMPANY2.C2_PROFIT) Is Not Null));
PySpark query I am trying to execute (df_1
->COMPANY2
& df_2
->Company1
):
join = ((df_1.C2_PROFIT == df_2.C1_PROFIT) & \
(df_1.C2_TARGET=='') & \
(df_2.C1_SALES=='') & \
(df_1.C2_PROFIT!=''))
df_1 = (df_1.alias('a')
.join(df_2.alias('b'), join, 'left')
.select(
*[c for c in df_1.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
)
)
But I am still getting null
values in column "C2_TARGET".
For the information: column "C1_Profit" is null-free, but in "C2_Profit" we sometimes have null
as well as values.
Example inputs:
------------------ --------------
| C1_PROFIT |C1_SALES |
------------------ --------------
|5637 | Positive |
|7464 | |
|43645 | |
|64657 | Growth P|
------------------ --------------
------------------ --------------
| C2_PROFIT |C2_TARGET |
------------------ --------------
| | |
|7464 | |
|43645 | |
|64657 | |
------------------ --------------
Expected result:
CodePudding user response:
based on the query, it seems like a case when
operation that assigns the value "1"
wherever the conditions are met. this can be replicated using a when().otherwise()
.
assuming company1
and company2
are the 2 dataframes
final_sdf = company1. \
join(company2, company1.c1_profit = company2.c2_profit, 'inner'). \
withColumn('c2_target',
func.when(company2.c2_target.isNull() &
company1.c1_sales.isNull() &
company2.c2_profit.isNotNull(), func.lit("1")
).
otherwise(company2.c2_target)
)
CodePudding user response:
In this answer, you have an example of how to do
UPDATE A INNER JOIN B
...
SET A...
In your case, you SET B...
:
UPDATE A INNER JOIN B
...
SET B...
You have correctly switched the order of your dataframes.
What's not correct is that ''
is not the same as null
. You must use .isNull()
and .isNotNull()
in your conditions.
Example inputs:
from pyspark.sql import functions as F
df_1 = spark.createDataFrame(
[(5637, 'Positive'),
(7464, None),
(43645, None),
(64657, 'Growth P')],
['C1_PROFIT', 'C1_SALES'])
df_2 = spark.createDataFrame(
[(None, None),
(7464, None),
(43645, None),
(64657, None)],
'C2_PROFIT int, C2_TARGET string')
Script:
join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \
df_2.C2_TARGET.isNull() & \
df_1.C1_SALES.isNull() & \
df_2.C2_PROFIT.isNotNull()
df = (df_2.alias('a')
.join(df_1.alias('b'), join_on, 'left')
.select(
*[c for c in df_2.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
)
)
df.show()
# --------- ---------
# |C2_PROFIT|C2_TARGET|
# --------- ---------
# | null| null|
# | 7464| 1|
# | 64657| null|
# | 43645| 1|
# --------- ---------