Home > Net >  Compare 2 dataframes and create an output dataframe containing the name of the columns that contain
Compare 2 dataframes and create an output dataframe containing the name of the columns that contain

Time:02-24

Using Spark and Scala I have df1 and df2 as follows:

df1
 -------------------- -------- ---------------- ---------- 
|                  ID|colA.   |colB.           |colC       |
 -------------------- -------- ---------------- ---------- 
|                   1|       0|              10|     APPLES|
|                   2|       0|              20|     APPLES|
|.                  3|       0|              30|      PEARS|
 -------------------- -------- ---------------- ---------- 

 df2
 -------------------- -------- ---------------- ---------- 
|                  ID|colA.   |colB            |colC      |
 -------------------- -------- ---------------- ---------- 
|                   1|       0|              10|    APPLES|
|                   2|       0|              20|     PEARS|
|                   3|       0|              10|    APPLES|
 -------------------- -------- ---------------- ---------- 

I need to compare these 2 dataframes and extract differences in a df3 that contains 4 columns: Column Names that contains a difference, Value from df1, Value from df2, ID How can I achieve this without using the column names, I can only use the ID hard coded.

  -------------------- -------- ---------------- ------------- -----
|    Column Name     |Value from df1.          |Value from df2| ID |
 -------------------- -------- ---------------- -------------- -----
|              col B |                       30|            10| 3. |
|              col C |                   APPLES|         PEARS| 2. |
|              col C |                    PEARS|        APPLES| 3. |
 -------------------- -------- ---------------- --------------- ---- 

What I did so far is to extract the names of the columns that contain differences but I'm stuck on how to get the values. val columns = df1.columns val df_join = df1.alias("d1").join(df2.alias("d2"), col("d1.id") === col("d2.id"), "left")

val test = columns.foldLeft(df_join) {(df_join, name) => df_join.withColumn(name   
"_temp", when(col("d1."   name) =!= col("d2."   name), lit(name))))}
.withColumn("Col Name", concat_ws(",", columns.map(name => col(name   "_temp")): _*))

CodePudding user response:

You can try this way:

// Consider the below dataframes

df1.show()

 --- ---- ---- ------ 
| ID|colA|colB|  colC|
 --- ---- ---- ------ 
|  1|   0|  10|APPLES|
|  2|   0|  20|APPLES|
|  3|   0|  30| PEARS|
 --- ---- ---- ------ 

df2.show()

 --- ---- ---- ------ 
| ID|colA|colB|  colC|
 --- ---- ---- ------ 
|  1|   0|  10|APPLES|
|  2|   0|  20| PEARS|
|  3|   0|  10|APPLES|
 --- ---- ---- ------ 

// As ID column can be hardcoded, we can use it to exclude from the list of all the columns of the dataframe so that we will be left with the remaining columns

val df1_columns = df1.columns.to[ListBuffer].-=("ID")
val df2_columns = df2.columns.to[ListBuffer].-=("ID")

// obtain the number of columns to use it in the stack function later

val df1_columns_count = df1_columns.length
val df2_columns_count = df2_columns.length

// obtain the columns in dynamic way to use in the stack function

var df1_stack_str = ""
var df2_stack_str = ""

// Typecasting columns to string type to avoid conflicts

df1_columns.foreach { column =>
    df1_stack_str  = s"'$column',cast($column as string),"
}
df1_stack_str = df1_stack_str.substring(0,df1_stack_str.lastIndexOf(","))

// Typecasting columns to string type to avoid conflicts

df2_columns.foreach { column =>
    df2_stack_str  = s"'$column',cast($column as string),"
}
df2_stack_str = df2_stack_str.substring(0,df2_stack_str.lastIndexOf(","))

/*
In this case the stack function implementation would look like this

val df11 = df1.selectExpr("id","stack(3,'colA',cast(colA as string),'colB',cast(colB as string),'colC',cast(colC as string)) as (column_name,value_from_df1)")
val df21 = df2.selectExpr("id id_","stack(3,'colA',cast(colA as string),'colB',cast(colB as string),'colC',cast(colC as string)) as (column_name_,value_from_df2)")
*/

val df11 = df1.selectExpr("id",s"stack($df1_columns_count,$df1_stack_str) as (column_name,value_from_df1)")
val df21 = df2.selectExpr("id id_",s"stack($df2_columns_count,$df2_stack_str) as (column_name_,value_from_df2)")

// use inner join to get value_from_df1 and value_from_df2 in one dataframe and apply the filter

df11.as("df11").join(df21.as("df21"),expr("df11.id=df21.id_ and df11.column_name=df21.column_name_"))
    .drop("id_","column_name_")
    .filter("value_from_df1!=value_from_df2")
    .show
    
// Final output

 --- ----------- -------------- -------------- 
| id|column_name|value_from_df1|value_from_df2|
 --- ----------- -------------- -------------- 
|  2|       colC|        APPLES|         PEARS|
|  3|       colB|            30|            10|
|  3|       colC|         PEARS|        APPLES|
 --- ----------- -------------- -------------- 
  • Related