Home > OS >  How to create a for loop in Scala to compare 2 spark dataframes columns and their values
How to create a for loop in Scala to compare 2 spark dataframes columns and their values

Time:02-17

I have df1 and df2 and I need to compare their columns and if there are differences between them, to count them, so I can have a number miss match column added.

df1
 -------------------- -------- ---------------- ---------- 
|                  ID|colA.   |colB.           |colC       |
 -------------------- -------- ---------------- ---------- 
|(122C8984ABF9F6EF...|       0|              10|     APPLES|
|(122C8984ABF9F6EF...|       0|              20|     APPLES|
|(122C8984ABF9F6EF...|       0|              10|      PEARS|
|(122C8984ABF9F6EF...|       0|              10|     APPLES|
|(122C8984ABF9F6EF...|       0|              15|    CARROTS|
|(122C8984ABF9F6EF...|       0|              10|      APPLE|
 -------------------- -------- ---------------- ---------- 


df2
 -------------------- -------- ---------------- ---------- 
|                  ID|colA.   |colB            |colC      |
 -------------------- -------- ---------------- ---------- 
|(122C8984ABF9F6EF...|       0|              10|     APPLES|
|(122C8984ABF9F6EF...|       0|              20|     APPLES|
|(122C8984ABF9F6EF...|       0|              10|     APPLES|
|(122C8984ABF9F6EF...|       0|              30|     APPLES|
|(122C8984ABF9F6EF...|       0|              15|    CARROTS|
|(122C8984ABF9F6EF...|       0|              15|      PEARS|
 -------------------- -------- ---------------- ---------- 

I can only use the ID when comparing them and the rest need to be used dinamically. What I did so far is to rename the column names and then join them:

   val columns: Array[String] = df1.columns
   val df1prefixed = df1.columns.map(c=>c   "_1")
   val df1_toDf = df1.toDF(df1prefixed:_*)

   val df2prefixed = df2.columns.map(c=>c   "_2")
   val df2_toDf = df2.toDF(df2prefixed:_*)

   val joined  = df1_toDf.join((df2_toDf), col("ID_1").eqNullSafe(col("ID_2")), 
   "full_outer")
   display(joined)

What I'm trying to do next is to compare colA_1 with colA_2 and if they are equal to print 0, otherwise 1 and same thing for all the columns and then add a new column named "Number miss match" where to add 0 or 1, depending on the comparison result.

I'm trying a for loop in Scala but I don't know how to do it:

for (column <- columns) { col(column   "_1") =!= col(column   "_2")), 1).otherwise(0)) }

CodePudding user response:

I would strongly advice to don't use loops for in spark, due the parallelism and functional approach you can have unexpected behaviours really hard to track. Instead I would suggest to use the except dataframe method which will compare dataframe 1 to dataframe 2 and create a new dataframe containing rows in df1 but not in the other df

CodePudding user response:

You can loop over the columns and create each time a single column dataframe for each of your source dataframes and use except to compare them. For example:

import spark.implicits._

val df1 = List((1, 3), (2, 4), (3, 6)).toDF("colA", "colB")
val df2 = List((1, 2), (2, 4), (3, 3)).toDF("colA", "colB")

df1.show()
// ---- ---- 
//|colA|colB|
// ---- ---- 
//|   1|   3|
//|   2|   4|
//|   3|   6|
// ---- ---- 

df2.show()
// ---- ---- 
//|colA|colB|
// ---- ---- 
//|   1|   2|
//|   2|   4|
//|   3|   3|
// ---- ---- 

val comparisonResultMap = df1.columns.map { case col =>
  val df1SingleCol = df1.select(col)
  val df2SingleCol = df2.select(col)
  val is_equal = if (df1SingleCol.except(df2SingleCol).isEmpty && df2SingleCol.except(df1SingleCol).isEmpty) 1 else 0
  (col, is_equal)
}.toMap

print(comparisonResultMap)
// output: Map(colA -> 1, colB -> 0)

  • Related