I have this UPDATE
SQL query that I need to convert to PySpark to work with dataframes. I'd like to know if it's possible to do it with dataframes and how to do it.
The SQL query:
UPDATE TBL1
SET COL_C=1
FROM TBL1
INNER JOIN TBL2 ON TBL1.COL_A=TBL2.COL_A AND TBL1.COL_B=TBL2.COL_B
INNER JOIN TBL3 ON TBL2.COL_A=TBL3.COL_A AND TBL2.COL_B=TBL3.COL_B
df_TBL1=TBL1
------- -------- ---------- ------ -----
| COL_A| COL_B| dob|gender|COL_C|
------- -------- ---------- ------ -----
| James| Smith|1991-04-01| M| 3000|
|Michael| Rose|2000-05-19| M| 4000|
| Robert|Williams|1978-09-05| M| 4000|
| Maria| Jones|1967-12-01| F| 4000|
| Jen| Brown|1980-02-17| F| 1000|
------- -------- ---------- ------ -----
df_TBL2=TBL2
------- --------- ---------- ------ -----
| COL_A| COL_B| dob|gender|COL_C|
------- --------- ---------- ------ -----
| John| Snow|1791-04-01| M| 9000|
|Michael| Rose|2000-05-19| M| 4000|
| Robert|Baratheon|1778-09-05| M| 9500|
| Maria| Jones|1967-12-01| F| 4000|
------- --------- ---------- ------ -----
df_TBL3=TBL3
-------- ------ ---------- ------ -----
| COL_A| COL_B| dob|gender|COL_C|
-------- ------ ---------- ------ -----
| Michael| Rose|2000-05-19| M| 4000|
| Peter|Parker|1978-09-05| M| 4000|
| Maria| Jones|1967-12-01| F| 4000|
|MaryJane| Brown|1980-12-17| F|10000|
-------- ------ ---------- ------ -----
The joins give me:
df_TBL_ALL=df_TBL1 \
.join(df_TBL2,(df_TBL1.COL_A==df_TBL2.COL_A) & (df_TBL1.COL_B==df_TBL2.COL_B),how="inner") \
.join(df_TBL3,(df_TBL2.COL_A==df_TBL3.COL_A) & (df_TBL2.COL_B==df_TBL3.COL_B),how="inner") \
.select(df_TBL1["*"]) \
.withColumn("COL_C",spf.lit(1))
And then, I'm trying to join them
df_TBL1_JOINED=df_TBL1 \
.join(df_TBL_ALL,(df_TBL1.COL_A==df_TBL_ALL.COL_A) & (df_TBL1.COL_B==df_TBL_ALL.COL_B),how="left") \
.select(df_TBL1["*"], \
spf.coalesce(df_TBL_ALL.COL_C,df_TBL1.COL_C).alias("COL_C"))
df_TBL1_JOINED.show()
# ------- -------- ---------- ------ ----- -----
# | COL_A| COL_B| dob|gender|COL_C|COL_C|
# ------- -------- ---------- ------ ----- -----
# | James| Smith|1991-04-01| M| 3000| 3000|
# | Jen| Brown|1980-02-17| F| 1000| 1000|
# | Maria| Jones|1967-12-01| F| 4000| 1|
# |Michael| Rose|2000-05-19| M| 4000| 1|
# | Robert|Williams|1978-09-05| M| 4000| 4000|
# ------- -------- ---------- ------ ----- -----
But I'm confused about how to go on.
I did:
TBL01_R=TBL01_R \
.drop("COL_C")
TBL01_R=TBL01_R \
.withColumnRenamed("COL_Nova","COL_C").show()
TBL01=TBL01_R
# ------- -------- ---------- ------ -----
# | COL_A| COL_B| dob|gender|COL_C|
# ------- -------- ---------- ------ -----
# | James| Smith|1991-04-01| M| 3000|
# | Jen| Brown|1980-02-17| F| 1000|
# | Maria| Jones|1967-12-01| F| 1|
# |Michael| Rose|2000-05-19| M| 1|
# | Robert|Williams|1978-09-05| M| 4000|
# ------- -------- ---------- ------ -----
I got to the expected result but I don't know if it is the best performing way to achieve it.
Expected result: df_tbl1 with COL_C updated with a 1 in all rows present in the join of df_tbl1 with df_tbl2 and df_tbl3.
df_TBL1:
------- -------- ---------- ------ -----
| COL_A| COL_B| dob|gender|COL_C|
------- -------- ---------- ------ -----
| James| Smith|1991-04-01| M| 3000|
|Michael| Rose|2000-05-19| M| 1|
| Robert|Williams|1978-09-05| M| 4000|
| Maria| Jones|1967-12-01| F| 1|
| Jen| Brown|1980-02-17| F| 1000|
------- -------- ---------- ------ -----
CodePudding user response:
you can do it easily using temptable and dataframe in spark sql. temp table are also entity of spark-sql , so it is same as dataframe
>>> TBL1.show()
----- -------- --------
|COL_A| COL_B| COL_C|
----- -------- --------
| A| B| scjsdk|
| A1|cnddssac|saacjsdk|
| A| cndds| scjsdk|
----- -------- --------
>>> TBL2.show()
----- ----------- ----------
|COL_A| COL_B| COL_C|
----- ----------- ----------
| A| B| scjksdk|
| A1|cndmmkdssac|sbkaacjsdk|
| A| cndds| scjjbjsdk|
----- ----------- ----------
>>> TBL3.show()
----- ----------- ------------
|COL_A| COL_B| COL_C|
----- ----------- ------------
| A| B| scjcjbksdk|
| A1|cndmmkdssac|sbkadaacjsdk|
| A| cndds|scjjdwfbjsdk|
----- ----------- ------------
>>> TBL1.registerTempTable("TBL1")
>>> TBL2.registerTempTable("TBL2")
>>> TBL3.registerTempTable("TBL3")
>>> final_df=spark.sql("select * FROM TBL1 INNER JOIN TBL2 ON TBL1.COL_A=TBL2.COL_A AND TBL1.COL_B=TBL2.COL_B INNER JOIN TBL3 ON TBL2.COL_A=TBL3.COL_A AND TBL2.COL_B=TBL3.COL_B")
>>> final_df.show()
----- ----- ------ ----- ----- --------- ----- ----- ------------
|COL_A|COL_B| COL_C|COL_A|COL_B| COL_C|COL_A|COL_B| COL_C|
----- ----- ------ ----- ----- --------- ----- ----- ------------
| A| B|scjsdk| A| B| scjksdk| A| B| scjcjbksdk|
| A|cndds|scjsdk| A|cndds|scjjbjsdk| A|cndds|scjjdwfbjsdk|
----- ----- ------ ----- ----- --------- ----- ----- ------------
*************** Update COL_C *******************
>>> final_df.withColumn("COL_C",lit(1)).show()
----- ----- ----- ----- ----- ----- ----- ----- -----
|COL_A|COL_B|COL_C|COL_A|COL_B|COL_C|COL_A|COL_B|COL_C|
----- ----- ----- ----- ----- ----- ----- ----- -----
| A| B| 1| A| B| 1| A| B| 1|
| A|cndds| 1| A|cndds| 1| A|cndds| 1|
----- ----- ----- ----- ----- ----- ----- ----- -----
======= OR IF you want to Update only TBL1 COL_C value only =====================
>>> final_df=spark.sql("select TBL1.COL_A,TBL1.COL_B,TBL1.COL_C FROM TBL1 INNER JOIN TBL2 ON TBL1.COL_A=TBL2.COL_A AND TBL1.COL_B=TBL2.COL_B INNER JOIN TBL3 ON TBL2.COL
>>> final_df.show()
----- ----- ------
|COL_A|COL_B| COL_C|
----- ----- ------
| A| B|scjsdk|
| A|cndds|scjsdk|
----- ----- ------
************ Update COL_C *****************
>>> final_df.withColumn("COL_C",lit(1)).show()
----- ----- -----
|COL_A|COL_B|COL_C|
----- ----- -----
| A| B| 1|
| A|cndds| 1|
----- ----- -----
CodePudding user response:
I tried to do a concise and performant option. The following does just 2 necessary joins avoiding the last inner join which you used in your question.
from pyspark.sql import functions as F
updating = F.forall(F.array('t2', 't3'), lambda x: x)
df_TBL1 = (
df_TBL1.withColumnRenamed('COL_C', 'COL_C_old').alias('T1')
.join(df_TBL2.withColumn('t2', F.lit(True)), ['COL_A', 'COL_B'], 'left')
.join(df_TBL3.withColumn('t3', F.lit(True)), ['COL_A', 'COL_B'], 'left')
.withColumn('updated_c', F.when(updating, 1).otherwise(F.col('COL_C_old')))
.select('T1.*', F.col('updated_c').alias('COL_C'))
.drop('COL_C_old')
)
df_TBL1.show()
# ------- -------- ---------- ------ -----
# | COL_A| COL_B| dob|gender|COL_C|
# ------- -------- ---------- ------ -----
# | James| Smith|1991-04-01| M| 3000|
# | Jen| Brown|1980-02-17| F| 1000|
# | Maria| Jones|1967-12-01| F| 1|
# |Michael| Rose|2000-05-19| M| 1|
# | Robert|Williams|1978-09-05| M| 4000|
# ------- -------- ---------- ------ -----