A solution in Pandas or PySpark is good either way, I am interested in the logic.
I have two dataframes:
df_1 =
id_1 id_2 value
ABC XYZ AA
ABA XYY null
ABD YYZ null
ABD ZYZ A
ABB XYY AA-
ACC XZY A--
BBB YYY null
df_2 =
id_1 id_2 value
ABC XYZ AA
ABA XYY CCC
ABD YYZ UNDEF
ABD ZYZ A-
ABB XYY AA-
How do I check that for every pair of id_1
and id_2
in df_2
, this pair exists in df_1
and has the same value
? I want to track the problematic rows where there isn't a match:
expected_output =
id_1 id_2 value value_actual
ABA XYY null CCC
ABD YYZ null UNDEF
ABD ZYZ A A-
I overcomplicated myself because I initially wanted a result where I can check multiple attributes and did:
def matching_spark(df_1, df_2):
df_1 = df_1.withColumn('index', row_number().over(Window.partitionBy().orderBy('id_1','id_2')))
df_2 = df_2.withColumn('index', row_number().over(Window.partitionBy().orderBy('id_1','id_2')))
s = df_1.unionByName(df_2).orderBy('id_1','id_2','index')
s = s.withColumn('change',array('value'))
s = s.withColumn('cols',split(lit('value'),'\,'))
s= s.withColumn('change1',last('change').over(Window.partitionBy('index').orderBy('id_1','id_2'))).where(col('change')!=col('change1'))
s = s.withColumn('change2', expr("transform(change,(c,i)->change[i]!=change1[i])")).withColumn('faulty_attr',expr('filter(cols,(x,j)->(change2[j]))')).drop('index','change','cols','change1' ,'change2')
return s
But nothing is coming out and I believe there is a simple joining solution for the example I have above.
CodePudding user response:
merge
and filter:
(df1.merge(df2, on=['id_1', 'id_2'], suffixes=(None, '_actual'))
.query('value != value_actual')
)
output:
id_1 id_2 value value_actual
1 ABA XYY NaN CCC
2 ABD YYZ NaN UNDEF
3 ABD ZYZ A A-