Home > database >  Check if PySaprk column values exists in another dataframe column values
Check if PySaprk column values exists in another dataframe column values

Time:02-11

I'm trying to figure out the condition to check if the values of one PySpark dataframe exist in another PySpark dataframe, and if so extract the value and compare again. I was thinking of doing a multiple withColumn() with a when() function.

For example my two dataframes can be something like:

df1
| id    | value |
| ----- | ----  |
| hello | 1111  |
| world | 2222  |

df2
| id     | value |
| ------ | ----  |
| hello  | 1111  |
| world  | 3333  |
| people | 2222  |

And the result I wish to obtain is to check first if the value of df1.id exists in df2.id and if true return me the df2.value, for example I was trying something like:

df1 = df1.withColumn("df2_value", when(df1.id == df2.id, df2.value))

So I get something like:

df1
| id    | value | df2_value |
| ----- | ----  | --------- |
| hello | 1111  | 1111      |
| world | 2222  | 3333      |

So that now I can do another check between these two value columns in the df1 dataframe, and return a boolean column (1or 0) in a new dataframe.

The result I wish to get would be something like:

df3
| id    | value | df2_value | match |
| ----- | ----  | --------- | ----- |
| hello | 1111  | 1111      | 1     |
| world | 2222  | 3333      | 0     |

CodePudding user response:

Left join df1 with df2 on id after prefixing all df2 columns except id with df2_*:

from pyspark.sql import functions as F

df1 = spark.createDataFrame([("hello", 1111), ("world", 2222)], ["id", "value"])
df2 = spark.createDataFrame([("hello", 1111), ("world", 3333), ("people", 2222)], ["id", "value"])

df = df1.join(
    df2.select("id", *[F.col(c).alias(f"df2_{c}") for c in df2.columns if c != 'id']),
    ["id"],
    "left"
)

Then using functools.reduce you can construct a boolean expression to check if columns match in the 2 dataframes like this:

from functools import reduce

check_expr = reduce(
    lambda acc, x: acc & (F.col(x) == F.col(f"df2_{x}")),
    [c for c in df1.columns if c != 'id'],
    F.lit(True)
)
    
df.withColumn("match", check_expr.cast("int")).show()
# ----- ----- --------- ----- 
#|   id|value|df2_value|match|
# ----- ----- --------- ----- 
#|hello| 1111|     1111|    1|
#|world| 2222|     3333|    0|
# ----- ----- --------- ----- 
  • Related