I have a Dataframe which has the following structure and data
Source:
Column1(String), Column2(String), Date
-----------------------
1, 2, 01/01/2021
A, B, 05/01/2021
M, N, 10/01/2021
I want to transform it to the following (First 2 columns are replicated in values and date is incremented until a the subsequent date, as following:
Column1(String), Column2(String), Date
-----------------------
1, 2, 01/01/2021
1, 2, 02/01/2021
1, 2, 03/01/2021
1, 2, 04/01/2021
A, B, 05/01/2021
A, B, 06/01/2021
A, B, 07/01/2021
A, B, 08/01/2021
A, B, 09/01/2021
M, N, 10/01/2021
Any idea on how this can be achieved in scala spark?
CodePudding user response:
Here is the working solution:
val dfp1 = List(("1001", 11, "01/10/2021"), ("1002", 21, "05/10/2021"), ("1001", 12, "10/10/2021"), ("1002", 22, "15/10/2021")).toDF("SerialNumber","SomeVal", "Date")
val dfProducts = dfp1.withColumn("Date", to_date($"Date","dd/MM/yyyy"))
dfProducts.show
------------ ------- ----------
|SerialNumber|SomeVal| Date|
------------ ------- ----------
| 1001| 11|2021-10-01|
| 1002| 21|2021-10-05|
| 1001| 12|2021-10-10|
| 1002| 22|2021-10-15|
------------ ------- ----------
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val overColumns = Window.partitionBy("SerialNumber").orderBy( "Date").rowsBetween(1, Window.unboundedFollowing)
val dfProduct1 = dfProducts.withColumn("NextSerialDate",first("Date", true).over(overColumns)).orderBy("Date")
dfProduct1.show
------------ ------- ---------- --------------
|SerialNumber|SomeVal| Date|NextSerialDate|
------------ ------- ---------- --------------
| 1001| 11|2021-10-01| 2021-10-10|
| 1002| 21|2021-10-05| 2021-10-15|
| 1001| 12|2021-10-10| null|
| 1002| 22|2021-10-15| null|
------------ ------- ---------- --------------
val dfProduct2= dfProduct1.withColumn("NextSerialDate", when(col("NextSerialDate").isNull, col("Date")).otherwise(date_sub(col("NextSerialDate"), 1))).orderBy("SerialNumber")
dfProduct2.show
------------ ------- ---------- --------------
|SerialNumber|SomeVal| Date|NextSerialDate|
------------ ------- ---------- --------------
| 1001| 11|2021-10-01| 2021-10-09|
| 1001| 12|2021-10-10| 2021-10-10|
| 1002| 21|2021-10-05| 2021-10-14|
| 1002| 22|2021-10-15| 2021-10-15|
------------ ------- ---------- --------------
val dfProduct3= dfProduct2.withColumn("ExpandedDate", explode_outer(sequence($"Date", $"NextSerialDate")))
dfProduct3.show
------------ ------- ---------- -------------- ------------
|SerialNumber|SomeVal| Date|NextSerialDate|ExpandedDate|
------------ ------- ---------- -------------- ------------
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-01|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-02|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-03|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-04|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-05|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-06|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-07|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-08|
| 1001| 11|2021-10-01| 2021-10-09| 2021-10-09|
| 1001| 12|2021-10-10| 2021-10-10| 2021-10-10|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-05|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-06|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-07|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-08|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-09|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-10|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-11|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-12|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-13|
| 1002| 21|2021-10-05| 2021-10-14| 2021-10-14|
------------ ------- ---------- -------------- ------------
only showing top 20 rows
val dfProduct4 = dfProduct3.drop("Date", "NextSerialDate").withColumn("Date", col("ExpandedDate")).drop("ExpandedDate")
dfProduct4.show(50, false)
------------ ------- ----------
|SerialNumber|SomeVal|Date |
------------ ------- ----------
|1001 |11 |2021-10-01|
|1001 |11 |2021-10-02|
|1001 |11 |2021-10-03|
|1001 |11 |2021-10-04|
|1001 |11 |2021-10-05|
|1001 |11 |2021-10-06|
|1001 |11 |2021-10-07|
|1001 |11 |2021-10-08|
|1001 |11 |2021-10-09|
|1001 |12 |2021-10-10|
|1002 |21 |2021-10-05|
|1002 |21 |2021-10-06|
|1002 |21 |2021-10-07|
|1002 |21 |2021-10-08|
|1002 |21 |2021-10-09|
|1002 |21 |2021-10-10|
|1002 |21 |2021-10-11|
|1002 |21 |2021-10-12|
|1002 |21 |2021-10-13|
|1002 |21 |2021-10-14|
|1002 |22 |2021-10-15|
------------ ------- ----------