Home > Mobile >  Upsert/Merge two dataframe in pyspark
Upsert/Merge two dataframe in pyspark

Time:12-15

i need one help for the below requirement. this is just for sample data. i have more than 200 columns in each data frame in real time use case. i need to compare two data frames and flag the differences.

df1

id,name,city 1,abc,pune 2,xyz,noida

df2

id,name,city 1,abc,pune 2,xyz,bangalore 3,kk,mumbai

expected dataframe

id,name,city,flag 1,abc,pune,same 2,xyz,bangalore,update 3,kk,mumbai,new

can someone please help me to build the logic in pyspark?

Thanks in advance.

CodePudding user response:

Pyspark's hash function can help with identifying the records that are different.

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.hash.html

from pyspark.sql.functions import col, hash

df1 = df1.withColumn('hash_value', hash('id', 'name', 'city') 
df2 = df2.withColumn('hash_value', hash('id', 'name', 'city') 

df_updates = df1 .alias('a').join(df2.alias('b'), (\
            (col('a.id') == col('b.id')) &\
            (col('a.hash_value') != col('b.hash_value')) \
            ) , how ='inner'
        )

df_updates = df_updates.select(b.*) 

Once you have identified the records that are different.

Then you would be able to setup a function that can loop through each column in the df to compare that columns value.

Something like this should work



def add_change_flags(df1, df2):
   df_joined = df1.join(df2, 'id', how='inner')

   for column in df1.columns:
      df_joined = df_joined.withColumn(column   "_change_flag", \
            when(col(f"df1.{column}") === col(f"df2.{column}"),True)\
            .otherwise(False)) 

   return df_joined 

CodePudding user response:

Since you have an id column for each DataFrame. you can join these two DataFrame into one DataFrame containing all columns.

joined_df = df1.join(df2,['id'],how='inner')

Once joined, you can append your new column [flag]:

joined_df.withColumn("flag", logic_condition_goes_here)

Nest when condition like this or use apply func:

joined_df.withColumn("flag",
  when(col("city1") === col("city2"),"Same")
  .when(col("name") === col("name2"),"Same")
  .otherwise("Not Same"))
  • Related