I have this input dataframe:
inputDF=
---------------------
|days (seq[String]) |
---------------------
|[sat, sun] |
|[mon, wed] |
|[fri ] |
|[fri, sat] |
|[mon, sun, sat] |
---------------------
I would like to obtain this outputDF containing all exisiting Strings from days column
outputDF=
--------------------- ----------------------------
|days (seq[String]) |all days (seq[String]) |
--------------------- ----------------------------
|[sat, sun] |[sat, sun, mon, wed, fri] |
|[mon, wed] |[sat, sun, mon, wed, fri] |
|[fri] |[sat, sun, mon, wed, fri] |
|[fri, sat] |[sat, sun, mon, wed, fri] |
|[mon, sun, sat] |[sat, sun, mon, wed, fri] |
--------------------- ----------------------------
How to do that in Scala/Spark please
CodePudding user response:
Assuming this is our input, and it is called dataset
:
---------------
|days |
---------------
|[sat, sun] |
|[mon, wed] |
|[fri] |
|[fri, sat] |
|[mon, sun, sat]|
---------------
We can get to this output:
--------------- -------------------------
|days |all_days |
--------------- -------------------------
|[sat, sun] |[fri, sat, sun, mon, wed]|
|[mon, wed] |[fri, sat, sun, mon, wed]|
|[fri] |[fri, sat, sun, mon, wed]|
|[fri, sat] |[fri, sat, sun, mon, wed]|
|[mon, sun, sat]|[fri, sat, sun, mon, wed]|
--------------- -------------------------
Through the following code:
// First we want to create a unique ID (if you don't have that already)
dataset = dataset.withColumn("id", lit(1))
// We want to group by the id, and collect all values into an array, then apply distinct
val collected = dataset
.groupBy("id")
.agg(array_distinct(flatten(collect_set("days"))).as("all_days"))
// We join our main table with the collected data
dataset = dataset
.join(collected, Seq("id"), "left")
.drop("id")
Good luck!
CodePudding user response:
You can create another dataset that contains the unique days value then join it back to your initial dataset:
import spark.implicits._
val data = Seq(
Seq("sat", "sun"),
Seq("mon", "wed"),
Seq("fri" ),
Seq("fri", "sat"),
Seq("mon", "sun", "sat")
)
val df = spark.sparkContext.parallelize(data).toDF("days")
val allDf = df.select(explode(col("days")).as("days")).agg(collect_set("days").as("all_days"))
.withColumn("join_column", lit(1))
df.withColumn("join_column", lit(1)).join(broadcast(allDf), Seq("join_column"), "left").drop("join_column").show(false)
--------------- -------------------------
|days |all_days |
--------------- -------------------------
|[sat, sun] |[fri, sun, wed, mon, sat]|
|[mon, wed] |[fri, sun, wed, mon, sat]|
|[fri] |[fri, sun, wed, mon, sat]|
|[fri, sat] |[fri, sun, wed, mon, sat]|
|[mon, sun, sat]|[fri, sun, wed, mon, sat]|
--------------- -------------------------