Home > Back-end >  Pyspark - Difference between 2 dataframes - Identify inserts, updates and deletes
Pyspark - Difference between 2 dataframes - Identify inserts, updates and deletes

Time:11-09

I have 2 dataframes df1(old) and df2(new). I am trying compare df2 with df1 and find the newly added rows, deleted rows, updated rows along with the names of the columns that got updated.

Here is the code that I have written

from pyspark.sql.functions import col, array, when, array_remove, lit

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
  schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[df2[c] for c in df2.columns if c not in ['firstname','middlename','lastname']], 
                array_remove(array(*conditions_), "").alias("updated_columns")
]

df1.join(df2, ["firstname","middlename","lastname"],"inner").select(*select_expr).show()

Here is the output that I got

 --------- ---------- -------- ----- ------ ------ --------------- 
|firstname|middlename|lastname|   id|gender|salary|updated_columns|
 --------- ---------- -------- ----- ------ ------ --------------- 
|    James|       rob|   Smith|36636|     M|  3000|             []|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|
 --------- ---------- -------- ----- ------ ------ --------------- 

Here is the output that I am expecting

 --------- ---------- -------- ----- ------ ------ --------------- ----------------- 
|firstname|middlename|lastname|   id|gender|salary|updated_columns|           status|
 --------- ---------- -------- ----- ------ ------ --------------- ----------------- 
|    James|       rob|   Smith|36636|     M|  3000|             []|        unchanged|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|          updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|          deleted|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|          updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|            added|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|        unchanged|
 --------- ---------- -------- ----- ------ ------ --------------- ----------------- 

I know that I can find the added and deleted rows using the left anti joins separately. But, I am looking for ways to update my existing join to get the above output.

CodePudding user response:

An outer join would help in your case. I have modified the code you have given to do this and also include the status column.

Minimally Working Example

from pyspark.sql.functions import col, array, when, array_remove, lit, size, coalesce
from pyspark.sql.types import *

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
schema = StructType([
    StructField("firstname",StringType(),True),
    StructField("middlename",StringType(),True),
    StructField("lastname",StringType(),True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

Logic for status Column and modified select_expr to coalesce values from df2 and df1 with preference given to df2 to get the update to date data.

status = when(df1["id"].isNull(), lit("added")).when(df2["id"].isNull(), lit("deleted")).when(size(array_remove(array(*conditions_), "")) > 0, lit("updated")).otherwise("unchanged")

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[coalesce(df2[c], df1[c]).alias(c) for c in df2.columns if c not in ['firstname','middlename','lastname']],                
                array_remove(array(*conditions_), "").alias("updated_columns"),
                status.alias("status"),
]

Finally, applying an outer join.

df1.join(df2, ["firstname","middlename","lastname"],"outer").select(*select_expr).show()

Output

 --------- ---------- -------- ----- ------ ------ --------------- --------- 
|firstname|middlename|lastname|   id|gender|salary|updated_columns|   status|
 --------- ---------- -------- ----- ------ ------ --------------- --------- 
|    James|       rob|   Smith|36636|     M|  3000|             []|unchanged|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|unchanged|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|  updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|  deleted|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|  updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|    added|
 --------- ---------- -------- ----- ------ ------ --------------- --------- 

CodePudding user response:

I'd suggest using Ranger so you capture what actually changes, and when. But if you only have theses data frames... You want to do a "outer" join. (pulls all data from both tables into one join.) You already have the update column logic.

For Status: "deleted"(In df1 but not df2) and "additions"(in df2, but not in df1), (if there are update columns) --> "updated" otherwise "unchanged".

  • Related