Home > database >  How to populate Map[string,Dataframe] as a column in a Dataframe in scala
How to populate Map[string,Dataframe] as a column in a Dataframe in scala

Time:11-25

I have a Map[String, Dataframe]. I want to combine all the data inside that Map into a single Dataframe. Can a dataframe have a column of Map datatype?

def sample(dfs : Map[String,Dataframe]): Dataframe =
{
.........
}

Example:

DF1

id name age
1  aaa  23
2  bbb  34

DF2

game  time  score
ludo  10    20
rummy 30    40 

I pass the above two DFs as Map to the function. Then put data of the each dataframes into a single column of the output dataframe as json format.

out DF

 ------ ---- ------ ---- ------ ---- ------ ---- ------ ---- ------ ------ ------ ------ 
|  column1                                                                              |
 ------ ---- ------ ---- ------ ---- ------ ---- ------ ---- ------ ------ ------ ------ 
| [{"id":"1","name":"aaa","age":"23"},{"id":21","name":"bbb","age":"24"}]               |
| [{"game":"ludo","time":"10","score":"20"},{"game":"rummy","time":"30","score":"40"}]  |
 ------ ---- ------ ---- ------ ---- ------ ---- ------ ---- ------ ------ ------ ------ 

CodePudding user response:

Here is a solution specific to your use-case:

import org.apache.spark.sql._

def sample(dfs : Map[String, DataFrame])(implicit spark: SparkSession): DataFrame =
  dfs
    .values
    .foldLeft(spark.emptyDataFrame)((acc, df) => acc.union(df))

The spark session is required to create the empty DataFrame accumulator to fold on.

Alternatively if you can guarantee the Map is non empty.

def sample(dfs : Map[String, DataFrame]): DataFrame =
  dfs
    .values
    .reduce((acc, df) => acc.union(df))

CodePudding user response:

You are asking to generate one row per dataframe. Be careful, if one of the dataframes is large enough so that it cannot be contained in one single executor, this code will break.

Let's first generate data and the map dfs of type Map[String, DataFrame].

val df1 = Seq((1, "aaa", 23), (2, "bbb", 34)).toDF("id", "name", "age")
val df2 = Seq(("ludo", 10, 20), ("rummy", 10, 40)).toDF("game", "time", "score")
dfs = Seq(df1, df2)

Then, for each dataframe of the map, we generate two columns. big_map associates each column name of the dataframe to its value (cast in string to have a consistent type). df simply contains the name of the dataframe. We then union all the dataframes with reduce and group by name (that's the part where every single dataframe ends up entirely in one row, and therefore one one executor).

dfs
    .toSeq
    .map{ case (name, df) => df
        .select(map(
             df.columns.flatMap(c => Seq(lit(c), col(c).cast("string"))) : _*
        ) as "big_map")
        .withColumn("df", lit(name))}
    .reduce(_ union _)
    .groupBy("df")
    .agg(collect_list('big_map) as "column1")
    .show(false)
 --- ----------------------------------------------------------------------------------- 
|df |column1                                                                            |
 --- ----------------------------------------------------------------------------------- 
|df0|[{id -> 1, name -> aaa, age -> 23}, {id -> 2, name -> bbb, age -> 34}]             |
|df1|[{game -> ludo, time -> 10, score -> 20}, {game -> rummy, time -> 10, score -> 40}]|
 --- ----------------------------------------------------------------------------------- 
  • Related