Home > OS >  Replication of DataFrame rows based on Date
Replication of DataFrame rows based on Date

Time:06-10

I have a Dataframe which has the following structure and data

Source:

Column1(String), Column2(String), Date
-----------------------
1, 2, 01/01/2021
A, B, 05/01/2021 
M, N, 10/01/2021

I want to transform it to the following (First 2 columns are replicated in values and date is incremented until a the subsequent date, as following:

Column1(String), Column2(String), Date
-----------------------
1, 2, 01/01/2021
1, 2, 02/01/2021
1, 2, 03/01/2021
1, 2, 04/01/2021
A, B, 05/01/2021 
A, B, 06/01/2021 
A, B, 07/01/2021 
A, B, 08/01/2021
A, B, 09/01/2021
M, N, 10/01/2021

Any idea on how this can be achieved in scala spark?

CodePudding user response:

Here is the working solution:

 val dfp1 = List(("1001", 11, "01/10/2021"), ("1002", 21,  "05/10/2021"), ("1001", 12, "10/10/2021"),  ("1002", 22,  "15/10/2021")).toDF("SerialNumber","SomeVal", "Date")
    val dfProducts = dfp1.withColumn("Date", to_date($"Date","dd/MM/yyyy"))
    dfProducts.show
     ------------ ------- ---------- 
    |SerialNumber|SomeVal|      Date|
     ------------ ------- ---------- 
    |        1001|     11|2021-10-01|
    |        1002|     21|2021-10-05|
    |        1001|     12|2021-10-10|
    |        1002|     22|2021-10-15|
     ------------ ------- ---------- 

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    val overColumns = Window.partitionBy("SerialNumber").orderBy( "Date").rowsBetween(1, Window.unboundedFollowing)
    val dfProduct1 = dfProducts.withColumn("NextSerialDate",first("Date", true).over(overColumns)).orderBy("Date")
    dfProduct1.show
     ------------ ------- ---------- -------------- 
    |SerialNumber|SomeVal|      Date|NextSerialDate|
     ------------ ------- ---------- -------------- 
    |        1001|     11|2021-10-01|    2021-10-10|
    |        1002|     21|2021-10-05|    2021-10-15|
    |        1001|     12|2021-10-10|          null|
    |        1002|     22|2021-10-15|          null|
     ------------ ------- ---------- -------------- 

    val dfProduct2= dfProduct1.withColumn("NextSerialDate",  when(col("NextSerialDate").isNull, col("Date")).otherwise(date_sub(col("NextSerialDate"), 1))).orderBy("SerialNumber")
    dfProduct2.show

     ------------ ------- ---------- -------------- 
    |SerialNumber|SomeVal|      Date|NextSerialDate|
     ------------ ------- ---------- -------------- 
    |        1001|     11|2021-10-01|    2021-10-09|
    |        1001|     12|2021-10-10|    2021-10-10|
    |        1002|     21|2021-10-05|    2021-10-14|
    |        1002|     22|2021-10-15|    2021-10-15|
     ------------ ------- ---------- -------------- 

  

      val dfProduct3= dfProduct2.withColumn("ExpandedDate",  explode_outer(sequence($"Date", $"NextSerialDate")))
        dfProduct3.show
     ------------ ------- ---------- -------------- ------------ 
    |SerialNumber|SomeVal|      Date|NextSerialDate|ExpandedDate|
     ------------ ------- ---------- -------------- ------------ 
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-01|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-02|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-03|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-04|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-05|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-06|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-07|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-08|
    |        1001|     11|2021-10-01|    2021-10-09|  2021-10-09|
    |        1001|     12|2021-10-10|    2021-10-10|  2021-10-10|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-05|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-06|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-07|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-08|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-09|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-10|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-11|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-12|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-13|
    |        1002|     21|2021-10-05|    2021-10-14|  2021-10-14|
     ------------ ------- ---------- -------------- ------------ 
    only showing top 20 rows

    val dfProduct4 = dfProduct3.drop("Date", "NextSerialDate").withColumn("Date",  col("ExpandedDate")).drop("ExpandedDate")
    dfProduct4.show(50, false)
     ------------ ------- ---------- 
    |SerialNumber|SomeVal|Date      |
     ------------ ------- ---------- 
    |1001        |11     |2021-10-01|
    |1001        |11     |2021-10-02|
    |1001        |11     |2021-10-03|
    |1001        |11     |2021-10-04|
    |1001        |11     |2021-10-05|
    |1001        |11     |2021-10-06|
    |1001        |11     |2021-10-07|
    |1001        |11     |2021-10-08|
    |1001        |11     |2021-10-09|
    |1001        |12     |2021-10-10|
    |1002        |21     |2021-10-05|
    |1002        |21     |2021-10-06|
    |1002        |21     |2021-10-07|
    |1002        |21     |2021-10-08|
    |1002        |21     |2021-10-09|
    |1002        |21     |2021-10-10|
    |1002        |21     |2021-10-11|
    |1002        |21     |2021-10-12|
    |1002        |21     |2021-10-13|
    |1002        |21     |2021-10-14|
    |1002        |22     |2021-10-15|
     ------------ ------- ---------- 
  • Related