I have 3 Dataframe df1(EMPLOYEE_INFO),df2(DEPARTMENT_INFO),df3(COMPANY_INFO)
and i want to update a column which is in df1 by joining all the three dataframes . The name of column is FLAG_DEPARTMENT which is in df1. I need to set the FLAG_DEPARTMENT='POLITICS'
. In sql query will look like this.
UPDATE [COMPANY_INFO] INNER JOIN ([DEPARTMENT_INFO]
INNER JOIN [EMPLOYEE_INFO] ON [DEPARTMENT_INFO].DEPT_ID = [EMPLOYEE_INFO].DEPT_ID
ON [COMPANY_INFO].[COMPANY_DEPT_ID] = [DEPARTMENT_INFO].[DEP_COMPANYID]
SET EMPLOYEE_INFO.FLAG_DEPARTMENT = "POLITICS";
if the values in columns of these three tables matches i need to set my FLAG_DEPARTMENT='POLITICS' in my employee_Info Table
How can i achieve this same thing in pyspark. I have just started learning pyspark don't have that much depth knowledge?
CodePudding user response:
You can use a chain of join
s with a select
on top of it.
Suppose that you have the following pyspark DataFrame
s:
employee_df
--------- -------
| Name|dept_id|
--------- -------
| John| dept_a|
| Liù| dept_b|
| Luke| dept_a|
| Michail| dept_a|
| Noe| dept_e|
|Shinchaku| dept_c|
| Vlad| dept_e|
--------- -------
department_df
------- ---------- ------------
|dept_id|company_id| description|
------- ---------- ------------
| dept_a| company1|Department A|
| dept_b| company2|Department B|
| dept_c| company5|Department C|
| dept_d| company3|Department D|
------- ---------- ------------
company_df
---------- -----------
|company_id|description|
---------- -----------
| company1| Company 1|
| company2| Company 2|
| company3| Company 3|
| company4| Company 4|
---------- -----------
Then you can run the following code to add the flag_department
column to your employee_df
:
from pyspark.sql import functions as F
employee_df = (
employee_df.alias('a')
.join(
department_df.alias('b'),
on='dept_id',
how='left',
)
.join(
company_df.alias('c'),
on=F.col('b.company_id') == F.col('c.company_id'),
how='left',
)
.select(
*[F.col(f'a.{c}') for c in employee_df.columns],
F.when(
F.col('b.dept_id').isNotNull() & F.col('c.company_id').isNotNull(),
F.lit('POLITICS')
).alias('flag_department')
)
)
The new employee_df
will be:
--------- ------- ---------------
| Name|dept_id|flag_department|
--------- ------- ---------------
| John| dept_a| POLITICS|
| Liù| dept_b| POLITICS|
| Luke| dept_a| POLITICS|
| Michail| dept_a| POLITICS|
| Noe| dept_e| null|
|Shinchaku| dept_c| null|
| Vlad| dept_e| null|
--------- ------- ---------------