Home > Software design >  How to create a function that checks if values in 2 columns of a PySpark dataframe matches values in
How to create a function that checks if values in 2 columns of a PySpark dataframe matches values in

Time:12-13

How would you create a function that checks if values in two PySpark columns of a dataframe matches values in the same two columns of another Pysark dataframe? I want to make a new column that shows validation if those values in the two columns exists in the other dataframe. The dataframes don't have identical columns, except for the two columns to join on. I'm new to PySpark. The code below shows a function that identifies validations when matching on one column:

def isValue_inTable(df1, df2, column_name, 
df2_nonCorresponding_column):
   df3 = (df1.join(df2, on=column_name, how='left')     
     .withColumn('Value_inTable', 
     F.when(df2[df2_nonCorresponding_column].isNull(), 
     False).otherwise(True)))
df3.select(column_name, 'Value_inTable').show()

The function above can show whether a value in one column of a PySpark df is existant in another df in the same column. I want to modify this code to allow the function to match values in two columns in df1 to two columns in df2 and let the user know if the values in the two columns in df1 exist in the two columns in df2. For example:

firstname lastname gender
Sam Smith M
Anna Rose F
Robert Williams M
firstname lastname gender salary
Gerogie Smith M 3000
Anna Rose F 4100
Robert Williams M 6200
firstname lastname values_do_notExist_inOtherTable
Sam Smith true
Anna Rose false
Robert Williams false

CodePudding user response:

You can left join on both firstname and lastname and then construct values_do_notExist_inOtherTable based on null condition.

from pyspark.sql import functions as F

df1_data = [("Sam", "Smith", "M", ), ("Anna", "Rose", "F", ), ("Robert", "Williams", "M", ), ]
df2_data = [("Gerogie", "Smith", "M", 3000, ), ("Anna", "Rose", "F", 4100, ), ("Robert", "Williams", "M", 6200, )]

df1 = spark.createDataFrame(df1_data, ("firstname", "lastname", "gender", ))
df2 = spark.createDataFrame(df2_data, ("firstname", "lastname", "gender", "salary", ))


def isValue_inTable(df1, df2, join_columns):
    return (df1.join(df2, on=join_columns, how="left")
    .withColumn("values_do_notExist_inOtherTable", F.when(df2[join_columns[0]].isNull() | 
                                                          df2[join_columns[1]].isNull(), True).otherwise(False))
    .select(df1["firstname"], df1["lastname"], df1["gender"], "values_do_notExist_inOtherTable"))

isValue_inTable(df1, df2, ["firstname", "lastname"]).show()

Output

|firstname|lastname|gender|values_do_notExist_inOtherTable|
 --------- -------- ------ ------------------------------- 
|     Anna|    Rose|     F|                          false|
|   Robert|Williams|     M|                          false|
|      Sam|   Smith|     M|                           true|
 --------- -------- ------ ------------------------------- 
  • Related