I am trying to compare two data set derived from hive and snowflake. These data set doesn't have unique identifier or primary key defined. so the challenge here is to verify the transaction of company from hive against respective snowflake rows.
hive_df
--- ----- ----- ---- ------ ------
|trans_id| comp_name |dept | date|
-------- ----- ------ ----- ------
|1101 | Johnaon&jho| acnt|2-3-21|
|1101 | Johnaon&jho| acnt|2-3-21|
|1102 | jeo_cop | sales|3-2-21|
--- ----- ----- ------ -----------
snowflake_df
--- ----- ----- ---- ------ ------
|trans_id| comp_name |dept | date|
-------- ----- ------ ----- ------
|1101 | Johnaon&jho| acnt|2-3-21|
|1101 | Johnaon&jho| acnt|2-3-21|
|1102 | jeo_cop | sales|3-2-21|
--- ----- ----- ------ -----------
i see many example to join two data frame using unique id and compare the difference using pyspark such as
joined_df=source_df\
.join(dest_df, 'key', 'full')
report = joined_df\
.filter((source_df[column] != dest_df[column]))
but what is the solution when data set does not have unique identifier? and how to compare actually? Any solution please
CodePudding user response:
I have used the following approach for many years.
union all with combination of group by and conditional count gives you a full picture of the data and handles duplications gracefully.
Generation of the sample dataframes
cols = ["trans_id","comp_name","dept","date"]
hive_data = \
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]
snowflake_data = \
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]
hive_df = spark.createDataFrame(hive_data, cols)
snowflake_df = spark.createDataFrame(snowflake_data, cols)
hive_df.show()
-------- ----------- ----- ------
|trans_id| comp_name| dept| date|
-------- ----------- ----- ------
| 1101|Johnaon&jho| acnt|2-3-21|
| 1101|Johnaon&jho| acnt|2-3-21|
| 1102| jeo_cop|sales|3-2-21|
-------- ----------- ----- ------
snowflake_df.show()
-------- ----------- ----- ------
|trans_id| comp_name| dept| date|
-------- ----------- ----- ------
| 1101|Johnaon&jho| acnt|2-3-21|
| 1101|Johnaon&jho| acnt|2-3-21|
| 1102| jeo_cop|sales|3-2-21|
-------- ----------- ----- ------
Solution, based on df_unionall & groupBy
import pyspark.sql.functions as F
hive_df_1 = hive_df.withColumn('source_system',F.lit('hive'))
snowflake_df_1 = snowflake_df.withColumn('source_system',F.lit('snow_flake'))
df_unionall = (hive_df_1
.unionAll(snowflake_df_1)
.groupBy(hive_df.columns)
.agg(
F.count("*").alias('total'),
F.count(F.when(F.col('source_system') == "hive",1)).alias('hive'),
F.count(F.when(F.col('source_system') == "hive",1)).alias('snowflake')
)
)
df_unionall.show()
-------- ----------- ----- ------ ----- ---- ---------
|trans_id| comp_name| dept| date|total|hive|snowflake|
-------- ----------- ----- ------ ----- ---- ---------
| 1101|Johnaon&jho| acnt|2-3-21| 4| 2| 2|
| 1102| jeo_cop|sales|3-2-21| 2| 1| 1|
-------- ----------- ----- ------ ----- ---- ---------
CodePudding user response:
You can use the subtract
method.
Please refer to: here
df = hive_df.subtract(snowflake_df)