Home > Net >  new column in dataframe derived from second dataframe
new column in dataframe derived from second dataframe

Time:10-14

I've two dataframes df1 and df2.I've to add a new columns in df1 from df2 :

     df1          
    X Y Z      
    1 2 3        
    4 5 6        
    7 8 9        
    3 6 9       
    
    df2
   col1 col2
   XX    aa
   YY    bb
   XX    cc
   ZZ    vv

The values of col1 in df2 should be added as new column(if it does'nt exists) in df1 and col2 as value of new column.For example :

    df1              
    X Y Z XX  YY  ZZ   
    1 2 3  aa  bb vv    
    4 5 6  cc           
    7 8 9               
    3 6 9               

     df2
   col1 col2
   XX    aa
   YY    bb
   XX    cc
   ZZ    vv

CodePudding user response:

First, spark dataset are made to be distributed. But column name are part of the schema, so they are in memory of the master. Thus, to add columns for each distinct values of df2.col1, you first need to get those values in the master (i.e. collect)

// inputs
val df1 = List((1,2,3), (4,5,6), (7,8,9), (3,6,9)).toDF("X", "Y", "Z")
val df2 = List(("XX", "aa"), ("YY", "bb"), ("XX", "cc"), ("ZZ", "vv")).toDF("col1", "col2")

val newColumns = df2.select("col1").as[String].distinct.collect

val newDF = newColumns.foldLeft(df1)( (df, col) => df.withColumn(col, lit("?")))
newDF.show

 --- --- --- --- --- --- 
|  X|  Y|  Z| ZZ| YY| XX|
 --- --- --- --- --- --- 
|  1|  2|  3|  ?|  ?|  ?|
|  4|  5|  6|  ?|  ?|  ?|
|  7|  8|  9|  ?|  ?|  ?|
|  3|  6|  9|  ?|  ?|  ?|
 --- --- --- --- --- --- 

But

  • I don't know what values you want to put in those column (above, I put "?" everywhere)
  • if there are a lot of rows in df2, like 10's of thousand, it can kill the master to collect and add them all to df1

Now, to give a little more, here is how you can add columns from df2.col1 and put as values the concatenated values of df2.col2

val toAdd = df2.groupBy("col1").agg(concat_ws(",", collect_set("col2")).as("col2All"))
toAdd.show

 ---- ------- 
|col1|col2All|
 ---- ------- 
|  ZZ|     vv|
|  YY|     bb|
|  XX|  cc,aa|
 ---- ------- 

val newColumns = toAdd.rdd.map(r => (r.getAs[String]("col1"), r.getAs[String]("col2All"))).collectAsMap()

val newDF = newColumns.foldLeft(df1){ case (df, (name, value)) => df.withColumn(name, lit(value))}
newDF.show

 --- --- --- ----- --- --- 
|  X|  Y|  Z|   XX| YY| ZZ|
 --- --- --- ----- --- --- 
|  1|  2|  3|cc,aa| bb| vv|
|  4|  5|  6|cc,aa| bb| vv|
|  7|  8|  9|cc,aa| bb| vv|
|  3|  6|  9|cc,aa| bb| vv|
 --- --- --- ----- --- --- 
  • Related