I have df1 and df2 and I need to compare their columns and if there are differences between them, to count them, so I can have a number miss match column added.
df1
-------------------- -------- ---------------- ----------
| ID|colA. |colB. |colC |
-------------------- -------- ---------------- ----------
|(122C8984ABF9F6EF...| 0| 10| APPLES|
|(122C8984ABF9F6EF...| 0| 20| APPLES|
|(122C8984ABF9F6EF...| 0| 10| PEARS|
|(122C8984ABF9F6EF...| 0| 10| APPLES|
|(122C8984ABF9F6EF...| 0| 15| CARROTS|
|(122C8984ABF9F6EF...| 0| 10| APPLE|
-------------------- -------- ---------------- ----------
df2
-------------------- -------- ---------------- ----------
| ID|colA. |colB |colC |
-------------------- -------- ---------------- ----------
|(122C8984ABF9F6EF...| 0| 10| APPLES|
|(122C8984ABF9F6EF...| 0| 20| APPLES|
|(122C8984ABF9F6EF...| 0| 10| APPLES|
|(122C8984ABF9F6EF...| 0| 30| APPLES|
|(122C8984ABF9F6EF...| 0| 15| CARROTS|
|(122C8984ABF9F6EF...| 0| 15| PEARS|
-------------------- -------- ---------------- ----------
I can only use the ID when comparing them and the rest need to be used dinamically. What I did so far is to rename the column names and then join them:
val columns: Array[String] = df1.columns
val df1prefixed = df1.columns.map(c=>c "_1")
val df1_toDf = df1.toDF(df1prefixed:_*)
val df2prefixed = df2.columns.map(c=>c "_2")
val df2_toDf = df2.toDF(df2prefixed:_*)
val joined = df1_toDf.join((df2_toDf), col("ID_1").eqNullSafe(col("ID_2")),
"full_outer")
display(joined)
What I'm trying to do next is to compare colA_1 with colA_2 and if they are equal to print 0, otherwise 1 and same thing for all the columns and then add a new column named "Number miss match" where to add 0 or 1, depending on the comparison result.
I'm trying a for loop in Scala but I don't know how to do it:
for (column <- columns) { col(column "_1") =!= col(column "_2")), 1).otherwise(0)) }
CodePudding user response:
I would strongly advice to don't use loops for in spark, due the parallelism and functional approach you can have unexpected behaviours really hard to track. Instead I would suggest to use the except
dataframe method which will compare dataframe 1 to dataframe 2 and create a new dataframe containing rows in df1 but not in the other df
CodePudding user response:
You can loop over the columns and create each time a single column dataframe for each of your source dataframes and use except
to compare them. For example:
import spark.implicits._
val df1 = List((1, 3), (2, 4), (3, 6)).toDF("colA", "colB")
val df2 = List((1, 2), (2, 4), (3, 3)).toDF("colA", "colB")
df1.show()
// ---- ----
//|colA|colB|
// ---- ----
//| 1| 3|
//| 2| 4|
//| 3| 6|
// ---- ----
df2.show()
// ---- ----
//|colA|colB|
// ---- ----
//| 1| 2|
//| 2| 4|
//| 3| 3|
// ---- ----
val comparisonResultMap = df1.columns.map { case col =>
val df1SingleCol = df1.select(col)
val df2SingleCol = df2.select(col)
val is_equal = if (df1SingleCol.except(df2SingleCol).isEmpty && df2SingleCol.except(df1SingleCol).isEmpty) 1 else 0
(col, is_equal)
}.toMap
print(comparisonResultMap)
// output: Map(colA -> 1, colB -> 0)