I have a SQL query which I am trying to convert into PySpark. In SQL query, we are joining two tables and updating a column where condition is matching. The SQL query looks like this:
UPDATE [STUDENT_TABLE] INNER JOIN [COLLEGE_DATA]
ON ([STUDENT_TABLE].UNIQUEID = COLLEGE_DATA.PROFESSIONALID)
AND ([STUDENT_TABLE].[ADDRESS] = COLLEGE_DATA.STATE_ADDRESS)
SET STUDENT_TABLE.STUDENTINSTATE = "REGULAR"
WHERE (((STUDENT_TABLE.BLOCKERS) Is Null));
CodePudding user response:
Example inputs:
from pyspark.sql import functions as F
df_stud = spark.createDataFrame(
[(1, 'x', None, 'REG'),
(2, 'y', 'qwe', 'REG')],
['UNIQUEID', 'ADDRESS', 'BLOCKERS', 'STUDENTINSTATE'])
df_college = spark.createDataFrame([(1, 'x'), (2, 'x')], ['PROFESSIONALID', 'STATE_ADDRESS'])
Your query would update just the first row of df_stud
- the value in the column "STUDENTINSTATE" would become "REGULAR".
In the following script, we do the join
, then select
all the columns from df_stud
, except the column "STUDENTINSTATE" which must be updated. This column gets value "REGULAR" if the column "PROFESSIONALID" (from df_college
) is not null (i.e. join condition was satisfied). If the join condition is not satisfied, the value should not be updated, so it is taken from column "STUDENTINSTATE" as is.
join_on = (df_stud.UNIQUEID == df_college.PROFESSIONALID) & \
(df_stud.ADDRESS == df_college.STATE_ADDRESS) & \
df_stud.BLOCKERS.isNull()
df = (df_stud.alias('a')
.join(df_college.alias('b'), join_on, 'left')
.select(
*[c for c in df_stud.columns if c != 'STUDENTINSTATE'],
F.expr("nvl2(b.PROFESSIONALID, 'REGULAR', a.STUDENTINSTATE) STUDENTINSTATE")
)
)
df.show()
# -------- ------- -------- --------------
# |UNIQUEID|ADDRESS|BLOCKERS|STUDENTINSTATE|
# -------- ------- -------- --------------
# | 1| x| null| REGULAR|
# | 2| y| qwe| REG|
# -------- ------- -------- --------------