Home > OS >  How to update dataframe column value while joinining with other dataframe in pyspark?
How to update dataframe column value while joinining with other dataframe in pyspark?

Time:10-12

I have 3 Dataframe df1(EMPLOYEE_INFO),df2(DEPARTMENT_INFO),df3(COMPANY_INFO) and i want to update a column which is in df1 by joining all the three dataframes . The name of column is FLAG_DEPARTMENT which is in df1. I need to set the FLAG_DEPARTMENT='POLITICS' . In sql query will look like this.

UPDATE [COMPANY_INFO] INNER JOIN ([DEPARTMENT_INFO] 
INNER JOIN [EMPLOYEE_INFO] ON [DEPARTMENT_INFO].DEPT_ID = [EMPLOYEE_INFO].DEPT_ID
ON [COMPANY_INFO].[COMPANY_DEPT_ID] = [DEPARTMENT_INFO].[DEP_COMPANYID]
SET EMPLOYEE_INFO.FLAG_DEPARTMENT = "POLITICS";

if the values in columns of these three tables matches i need to set my FLAG_DEPARTMENT='POLITICS' in my employee_Info Table

How can i achieve this same thing in pyspark. I have just started learning pyspark don't have that much depth knowledge?

CodePudding user response:

You can use a chain of joins with a select on top of it.

Suppose that you have the following pyspark DataFrames:

employee_df
 --------- ------- 
|     Name|dept_id|
 --------- ------- 
|     John| dept_a|
|      Liù| dept_b|
|     Luke| dept_a|
|  Michail| dept_a|
|      Noe| dept_e|
|Shinchaku| dept_c|
|     Vlad| dept_e|
 --------- ------- 

department_df
 ------- ---------- ------------ 
|dept_id|company_id| description|
 ------- ---------- ------------ 
| dept_a|  company1|Department A|
| dept_b|  company2|Department B|
| dept_c|  company5|Department C|
| dept_d|  company3|Department D|
 ------- ---------- ------------ 

company_df
 ---------- ----------- 
|company_id|description|
 ---------- ----------- 
|  company1|  Company 1|
|  company2|  Company 2|
|  company3|  Company 3|
|  company4|  Company 4|
 ---------- ----------- 

Then you can run the following code to add the flag_department column to your employee_df:

from pyspark.sql import functions as F

employee_df = (
        employee_df.alias('a')
        .join(
            department_df.alias('b'),
            on='dept_id',
            how='left',
        )
        .join(
            company_df.alias('c'),
            on=F.col('b.company_id') == F.col('c.company_id'),
            how='left',
        )
        .select(
            *[F.col(f'a.{c}') for c in employee_df.columns],
            F.when(
                F.col('b.dept_id').isNotNull() & F.col('c.company_id').isNotNull(),
                F.lit('POLITICS')
            ).alias('flag_department')
        )
    )

The new employee_df will be:

 --------- ------- --------------- 
|     Name|dept_id|flag_department|
 --------- ------- --------------- 
|     John| dept_a|       POLITICS|
|      Liù| dept_b|       POLITICS|
|     Luke| dept_a|       POLITICS|
|  Michail| dept_a|       POLITICS|
|      Noe| dept_e|           null|
|Shinchaku| dept_c|           null|
|     Vlad| dept_e|           null|
 --------- ------- --------------- 
  • Related