i need one help for the below requirement. this is just for sample data. i have more than 200 columns in each data frame in real time use case. i need to compare two data frames and flag the differences.
df1
id,name,city 1,abc,pune 2,xyz,noida
df2
id,name,city 1,abc,pune 2,xyz,bangalore 3,kk,mumbai
expected dataframe
id,name,city,flag 1,abc,pune,same 2,xyz,bangalore,update 3,kk,mumbai,new
can someone please help me to build the logic in pyspark?
Thanks in advance.
CodePudding user response:
Pyspark's hash function can help with identifying the records that are different.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.hash.html
from pyspark.sql.functions import col, hash
df1 = df1.withColumn('hash_value', hash('id', 'name', 'city')
df2 = df2.withColumn('hash_value', hash('id', 'name', 'city')
df_updates = df1 .alias('a').join(df2.alias('b'), (\
(col('a.id') == col('b.id')) &\
(col('a.hash_value') != col('b.hash_value')) \
) , how ='inner'
)
df_updates = df_updates.select(b.*)
Once you have identified the records that are different.
Then you would be able to setup a function that can loop through each column in the df to compare that columns value.
Something like this should work
def add_change_flags(df1, df2):
df_joined = df1.join(df2, 'id', how='inner')
for column in df1.columns:
df_joined = df_joined.withColumn(column "_change_flag", \
when(col(f"df1.{column}") === col(f"df2.{column}"),True)\
.otherwise(False))
return df_joined
CodePudding user response:
Since you have an id column for each DataFrame. you can join these two DataFrame into one DataFrame containing all columns.
joined_df = df1.join(df2,['id'],how='inner')
Once joined, you can append your new column [flag]:
joined_df.withColumn("flag", logic_condition_goes_here)
Nest when condition like this or use apply func:
joined_df.withColumn("flag",
when(col("city1") === col("city2"),"Same")
.when(col("name") === col("name2"),"Same")
.otherwise("Not Same"))