I have a Map[String, Dataframe]
. I want to combine all the data inside that Map into a single Dataframe. Can a dataframe have a column of Map datatype?
def sample(dfs : Map[String,Dataframe]): Dataframe =
{
.........
}
Example:
DF1
id name age
1 aaa 23
2 bbb 34
DF2
game time score
ludo 10 20
rummy 30 40
I pass the above two DFs as Map to the function. Then put data of the each dataframes into a single column of the output dataframe as json format.
out DF
------ ---- ------ ---- ------ ---- ------ ---- ------ ---- ------ ------ ------ ------
| column1 |
------ ---- ------ ---- ------ ---- ------ ---- ------ ---- ------ ------ ------ ------
| [{"id":"1","name":"aaa","age":"23"},{"id":21","name":"bbb","age":"24"}] |
| [{"game":"ludo","time":"10","score":"20"},{"game":"rummy","time":"30","score":"40"}] |
------ ---- ------ ---- ------ ---- ------ ---- ------ ---- ------ ------ ------ ------
CodePudding user response:
Here is a solution specific to your use-case:
import org.apache.spark.sql._
def sample(dfs : Map[String, DataFrame])(implicit spark: SparkSession): DataFrame =
dfs
.values
.foldLeft(spark.emptyDataFrame)((acc, df) => acc.union(df))
The spark session is required to create the empty DataFrame accumulator to fold on.
Alternatively if you can guarantee the Map
is non empty.
def sample(dfs : Map[String, DataFrame]): DataFrame =
dfs
.values
.reduce((acc, df) => acc.union(df))
CodePudding user response:
You are asking to generate one row per dataframe. Be careful, if one of the dataframes is large enough so that it cannot be contained in one single executor, this code will break.
Let's first generate data and the map dfs
of type Map[String, DataFrame]
.
val df1 = Seq((1, "aaa", 23), (2, "bbb", 34)).toDF("id", "name", "age")
val df2 = Seq(("ludo", 10, 20), ("rummy", 10, 40)).toDF("game", "time", "score")
dfs = Seq(df1, df2)
Then, for each dataframe of the map, we generate two columns. big_map
associates each column name of the dataframe to its value (cast in string to have a consistent type). df
simply contains the name of the dataframe. We then union all the dataframes with reduce
and group by name
(that's the part where every single dataframe ends up entirely in one row, and therefore one one executor).
dfs
.toSeq
.map{ case (name, df) => df
.select(map(
df.columns.flatMap(c => Seq(lit(c), col(c).cast("string"))) : _*
) as "big_map")
.withColumn("df", lit(name))}
.reduce(_ union _)
.groupBy("df")
.agg(collect_list('big_map) as "column1")
.show(false)
--- -----------------------------------------------------------------------------------
|df |column1 |
--- -----------------------------------------------------------------------------------
|df0|[{id -> 1, name -> aaa, age -> 23}, {id -> 2, name -> bbb, age -> 34}] |
|df1|[{game -> ludo, time -> 10, score -> 20}, {game -> rummy, time -> 10, score -> 40}]|
--- -----------------------------------------------------------------------------------