Home > OS >  Check for existence of duplicate column value tuples between two pyspark dataframes
Check for existence of duplicate column value tuples between two pyspark dataframes

Time:05-19

I have two dataframes and I want to compare one against the other using multiple columns, such that if the tuple of column values from one dataframe exists in the other dataframe, an indicator is placed in the first dataframe (e.g., tupleExistsInDf2 = True). Example code:

import pandas as pd
from datetime import date
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df1 = pd.DataFrame({
    "pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 4],
    "date": [
        date("2022-05-06"),
        date("2022-05-13"),
        date("2022-05-06"),
        date("2022-05-06"),
        date("2022-05-14"),
        date("2022-05-15"),
        date("2022-05-05"),
        date("2022-05-05"),
        date("2022-05-11"),
        date("2022-05-12")
    ],
    "variable": [A, B, C, D, A, A, E, F, A, G]
})

df1 = spark.createDataFrame(df1)

df2 = pd.DataFrame({
    "pk": [1, 1, 2, 2, 2, 3, 4, 5, 6, 6],
    "date": [
        date("2022-05-06"),
        date("2022-05-13"),
        date("2022-05-14"),
        date("2022-05-15"),
        date("2022-05-05"),
        date("2022-05-11"),
        date("2022-05-08"), 
        date("2022-05-03"),
        date("2022-05-07"),
        date("2022-05-08")
    ],
    "variable": [A, B, A, A, E, A, A, H, A, A]

})

df2 = spark.createDataFrame(df2)

This yields two pyspark dataframes:

df1.show()

# ----- ----------- -------- 
#|pk   |       date|variable|
# ----- ----------- -------- 
#|    1| 2022-05-06|       A|
#|    1| 2022-05-13|       B|
#|    1| 2022-05-06|       C|
#|    1| 2022-05-06|       D|
#|    2| 2022-05-14|       A|
#|    2| 2022-05-15|       A|
#|    2| 2022-05-05|       E|
#|    2| 2022-05-05|       F|
#|    3| 2022-05-11|       A|
#|    4| 2022-05-12|       G|
# ----- ----------- -------- 

df2.show()

# ----- ----------- -------- 
#|pk   |       date|variable|
# ----- ----------- -------- 
#|    1| 2022-05-06|       A|
#|    1| 2022-05-13|       B|
#|    2| 2022-05-14|       A|
#|    2| 2022-05-15|       A|
#|    2| 2022-05-05|       E|
#|    3| 2022-05-11|       A|
#|    4| 2022-05-08|       A|
#|    5| 2022-05-03|       H|
#|    6| 2022-05-07|       A|
#|    6| 2022-05-08|       A|
# ----- ----------- -------- 

What I want to do is indicate in df1 whether the each tuple (pk, date, variable) exists in df2. The result for this example would be:

# ----- ----------- -------- ---------------- 
#|pk   |       date|variable|tupleExistsInDf2|
# ----- ----------- -------- ---------------- 
#|    1| 2022-05-06|       A|            True|
#|    1| 2022-05-13|       B|            True|
#|    1| 2022-05-06|       C|           False|
#|    1| 2022-05-06|       D|           False|
#|    2| 2022-05-14|       A|            True|
#|    2| 2022-05-15|       A|           False|
#|    2| 2022-05-05|       E|            True|
#|    2| 2022-05-05|       F|           False|
#|    3| 2022-05-11|       A|            True|
#|    4| 2022-05-12|       G|           False|
# ----- ----------- -------- ---------------- 

CodePudding user response:

It's probably enough to use the intersection, as that more useful than a column of False/True. But in case you need it here's the sudo code for what you asked for:

  1. Take the intersection
  2. Create the join condition
  3. Left outer Join and select only the columns required
    1. Use Coalesce to fill in nulls. naFill would also work.

.

import pyspark.sql.functions as f
intersection = df1.intersect(df2).withColumn("tupleExistsInDf2", f.lit(True) ) # collect columns that are in both tables.
cond = [df1.pk==intersection.pk, df1.date==intersection.date, df1.variable==intersection.variable] # create join condition
df1.join( intersection , cond,  "leftouter")\
  .select( df1.pk,df1.date,df1.variable, f.coalesce(intersection.tupleExistsInDf2, f.lit(False) ) ) 
  • Related