Home > Software design >  Pyspark how to compare two data frame without a unique/primary identifier
Pyspark how to compare two data frame without a unique/primary identifier

Time:03-17

I am trying to compare two data set derived from hive and snowflake. These data set doesn't have unique identifier or primary key defined. so the challenge here is to verify the transaction of company from hive against respective snowflake rows.

hive_df
 --- ----- ----- ---- ------ ------ 
|trans_id| comp_name  |dept |  date|
 -------- ----- ------ ----- ------ 
|1101    | Johnaon&jho| acnt|2-3-21|
|1101    | Johnaon&jho| acnt|2-3-21|
|1102    | jeo_cop    | sales|3-2-21|
 --- ----- ----- ------ ----------- 
snowflake_df
 --- ----- ----- ---- ------ ------ 
|trans_id| comp_name  |dept |  date|
 -------- ----- ------ ----- ------ 
|1101    | Johnaon&jho| acnt|2-3-21|
|1101    | Johnaon&jho| acnt|2-3-21|
|1102    | jeo_cop    | sales|3-2-21|
 --- ----- ----- ------ ----------- 

i see many example to join two data frame using unique id and compare the difference using pyspark such as

joined_df=source_df\
    .join(dest_df, 'key', 'full')
report = joined_df\
                    .filter((source_df[column] != dest_df[column]))

but what is the solution when data set does not have unique identifier? and how to compare actually? Any solution please

CodePudding user response:

I have used the following approach for many years.
union all with combination of group by and conditional count gives you a full picture of the data and handles duplications gracefully.

Generation of the sample dataframes

cols = ["trans_id","comp_name","dept","date"]

hive_data = \
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]

snowflake_data = \
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]

hive_df = spark.createDataFrame(hive_data, cols)
snowflake_df = spark.createDataFrame(snowflake_data, cols)

hive_df.show()

 -------- ----------- ----- ------ 
|trans_id|  comp_name| dept|  date|
 -------- ----------- ----- ------ 
|    1101|Johnaon&jho| acnt|2-3-21|
|    1101|Johnaon&jho| acnt|2-3-21|
|    1102|    jeo_cop|sales|3-2-21|
 -------- ----------- ----- ------ 

snowflake_df.show()

 -------- ----------- ----- ------ 
|trans_id|  comp_name| dept|  date|
 -------- ----------- ----- ------ 
|    1101|Johnaon&jho| acnt|2-3-21|
|    1101|Johnaon&jho| acnt|2-3-21|
|    1102|    jeo_cop|sales|3-2-21|
 -------- ----------- ----- ------ 

Solution, based on df_unionall & groupBy

import pyspark.sql.functions as F

hive_df_1 = hive_df.withColumn('source_system',F.lit('hive'))
snowflake_df_1 = snowflake_df.withColumn('source_system',F.lit('snow_flake'))

df_unionall = (hive_df_1
               .unionAll(snowflake_df_1)
               .groupBy(hive_df.columns)
               .agg(
                 F.count("*").alias('total'),
                 F.count(F.when(F.col('source_system') == "hive",1)).alias('hive'),
                 F.count(F.when(F.col('source_system') == "hive",1)).alias('snowflake')
               )
              )

df_unionall.show()

 -------- ----------- ----- ------ ----- ---- --------- 
|trans_id|  comp_name| dept|  date|total|hive|snowflake|
 -------- ----------- ----- ------ ----- ---- --------- 
|    1101|Johnaon&jho| acnt|2-3-21|    4|   2|        2|
|    1102|    jeo_cop|sales|3-2-21|    2|   1|        1|
 -------- ----------- ----- ------ ----- ---- --------- 

CodePudding user response:

You can use the subtract method. Please refer to: here

df = hive_df.subtract(snowflake_df)
  • Related