Home > Software engineering >  In pyspark how to generate new rows against every month value from given start date and end date tim
In pyspark how to generate new rows against every month value from given start date and end date tim

Time:10-31

In pyspark how to generate new rows against every month value from given start date and end date time period? Say, I have a start date column and end date column and there are 8 months in between the dates by datediff. How can i generate 8 rows against 8 months with new column having month values respectively. Say 1 for Jan if start date month is Jan, 2 for Feb and so on till 8?

I tried using tried explode and array_repeat which helped me generate rows against month_between() for every row. But its not my desired result.

CodePudding user response:

there's a sequence function that will create an array using a start, end and step (like a list comprehension). you can explode that array to create rows.

given, there can be cases where the day of start date and end date don't match up, you'll need to adjust the resulting array before the explode.

here's an example

data_sdf. \
    withColumn('mth_arr', func.expr('sequence(start_dt, end_dt, interval 1 month)')). \
    withColumn('mth_arr_new', 
               func.when(func.month(func.element_at('mth_arr', -1)) < func.month('end_dt'), 
                         func.flatten(func.array('mth_arr', func.array('end_dt')))
                         ).
               when(func.element_at('mth_arr', -1) < func.col('end_dt'), 
                    func.flatten(func.array(func.expr('slice(mth_arr, 1, size(mth_arr)-1)'), func.array('end_dt')))
                    ).
               otherwise(func.col('mth_arr'))
               ). \
    selectExpr('start_dt', 'end_dt', 'explode(mth_arr_new) as mths'). \
    show(100, truncate=False)

#  ---------- ---------- ---------- 
# |start_dt  |end_dt    |mths      |
#  ---------- ---------- ---------- 
# |2020-01-01|2020-10-01|2020-01-01|
# |2020-01-01|2020-10-01|2020-02-01|
# |2020-01-01|2020-10-01|2020-03-01|
# |2020-01-01|2020-10-01|2020-04-01|
# |2020-01-01|2020-10-01|2020-05-01|
# |2020-01-01|2020-10-01|2020-06-01|
# |2020-01-01|2020-10-01|2020-07-01|
# |2020-01-01|2020-10-01|2020-08-01|
# |2020-01-01|2020-10-01|2020-09-01|
# |2020-01-01|2020-10-01|2020-10-01|
# |2020-01-10|2020-10-11|2020-01-10|
# |2020-01-10|2020-10-11|2020-02-10|
# |2020-01-10|2020-10-11|2020-03-10|
# |2020-01-10|2020-10-11|2020-04-10|
# |2020-01-10|2020-10-11|2020-05-10|
# |2020-01-10|2020-10-11|2020-06-10|
# |2020-01-10|2020-10-11|2020-07-10|
# |2020-01-10|2020-10-11|2020-08-10|
# |2020-01-10|2020-10-11|2020-09-10|
# |2020-01-10|2020-10-11|2020-10-11|
# |2020-01-10|2020-10-09|2020-01-10|
# |2020-01-10|2020-10-09|2020-02-10|
# |2020-01-10|2020-10-09|2020-03-10|
# |2020-01-10|2020-10-09|2020-04-10|
# |2020-01-10|2020-10-09|2020-05-10|
# |2020-01-10|2020-10-09|2020-06-10|
# |2020-01-10|2020-10-09|2020-07-10|
# |2020-01-10|2020-10-09|2020-08-10|
# |2020-01-10|2020-10-09|2020-09-10|
# |2020-01-10|2020-10-09|2020-10-09|
# |2020-01-01|2020-02-01|2020-01-01|
# |2020-01-01|2020-02-01|2020-02-01|
#  ---------- ---------- ---------- 

the array from sequence would look like the following

 ---------- ---------- ------------------------------------------------------------------------------------------------------------------------ 
|start_dt  |end_dt    |mth_arr                                                                                                                 |
 ---------- ---------- ------------------------------------------------------------------------------------------------------------------------ 
|2020-01-01|2020-10-01|[2020-01-01, 2020-02-01, 2020-03-01, 2020-04-01, 2020-05-01, 2020-06-01, 2020-07-01, 2020-08-01, 2020-09-01, 2020-10-01]|
|2020-01-10|2020-10-11|[2020-01-10, 2020-02-10, 2020-03-10, 2020-04-10, 2020-05-10, 2020-06-10, 2020-07-10, 2020-08-10, 2020-09-10, 2020-10-10]|
|2020-01-10|2020-10-09|[2020-01-10, 2020-02-10, 2020-03-10, 2020-04-10, 2020-05-10, 2020-06-10, 2020-07-10, 2020-08-10, 2020-09-10]            |
|2020-01-01|2020-02-01|[2020-01-01, 2020-02-01]                                                                                                |
 ---------- ---------- ------------------------------------------------------------------------------------------------------------------------ 

and the adjusted array

 ---------- ---------- ------------------------------------------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------------------------------------------ 
|start_dt  |end_dt    |mth_arr                                                                                                                 |mth_arr_new                                                                                                             |
 ---------- ---------- ------------------------------------------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------------------------------------------ 
|2020-01-01|2020-10-01|[2020-01-01, 2020-02-01, 2020-03-01, 2020-04-01, 2020-05-01, 2020-06-01, 2020-07-01, 2020-08-01, 2020-09-01, 2020-10-01]|[2020-01-01, 2020-02-01, 2020-03-01, 2020-04-01, 2020-05-01, 2020-06-01, 2020-07-01, 2020-08-01, 2020-09-01, 2020-10-01]|
|2020-01-10|2020-10-11|[2020-01-10, 2020-02-10, 2020-03-10, 2020-04-10, 2020-05-10, 2020-06-10, 2020-07-10, 2020-08-10, 2020-09-10, 2020-10-10]|[2020-01-10, 2020-02-10, 2020-03-10, 2020-04-10, 2020-05-10, 2020-06-10, 2020-07-10, 2020-08-10, 2020-09-10, 2020-10-11]|
|2020-01-10|2020-10-09|[2020-01-10, 2020-02-10, 2020-03-10, 2020-04-10, 2020-05-10, 2020-06-10, 2020-07-10, 2020-08-10, 2020-09-10]            |[2020-01-10, 2020-02-10, 2020-03-10, 2020-04-10, 2020-05-10, 2020-06-10, 2020-07-10, 2020-08-10, 2020-09-10, 2020-10-09]|
|2020-01-01|2020-02-01|[2020-01-01, 2020-02-01]                                                                                                |[2020-01-01, 2020-02-01]                                                                                                |
 ---------- ---------- ------------------------------------------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------------------------------------------ 

CodePudding user response:

Option 1: Basic python and df.unionByName

My first idea would be to solve this through basic Python operations. From a high-level perspective, this would look something like the following:

import pyspark.sql.functions as F
 
# Create a list of datetime objects
dates = [..]

# Transform to Spark DataFrame
newdf = spark.CreateDataFrame(dates, schema=['date'])
newdf = newdf.withColumn('date', F.to_date('date', format='yyyy-MM-dd'))

# Combine the dataframes
df = df.unionByName(newdf, allowMissingColumns=True)

# Perform operations needed

Option 2: explode

In case you want to i) make this more dynamic, for example when having multiple date ranges you need to impute and ii) stay within Spark, you could try the following.

# Demo data
data = [{'date': '2021-01-01'},
        {'date': '2021-02-01'},
        {'date': '2022-02-01'}
        ]
df = spark.createDataFrame(data)
df = df.withColumn('date', F.to_date(F.col('date'), format='yyyy-MM-dd'))

# Determine the next date and the number of months
df = df.withColumn('date_next', F.lead('date').over(Window().orderBy('date')))
df = df.withColumn('numberofmonths', F.months_between('date_next', 'date').cast(T.IntegerType()))
df = df.fillna(0, subset=['numberofmonths'])

@udf(returnType=T.ArrayType(T.IntegerType()))
def makemonths(numberofmonths):
    """UDF to achieve the list comprehension below"""
    if numberofmonths > 1:
        months = [xx for xx in range(numberofmonths)]
        return months
    else:
        return []

df = df.withColumn('months', makemonths(F.col('numberofmonths')))

# Create a new dataframe with an explode.
# F.explode_outer is chosen over F.explode to retain dates where months = []
newdf = df.select('date', F.explode_outer('months').alias('months'))
newdf = newdf.fillna(0, subset=['months'])
newdf = newdf.withColumn('newdate', F.add_months(F.col('date'), F.col('months')))

This results in the following two dataframes

>>> df.show()
 ---------- ---------- -------------- -------------------- 
|      date| date_next|numberofmonths|              months|
 ---------- ---------- -------------- -------------------- 
|2021-01-01|2021-02-01|             1|                  []|
|2021-02-01|2022-02-01|            12|[0, 1, 2, 3, 4, 5...|
|2022-02-01|      null|             0|                  []|
 ---------- ---------- -------------- -------------------- 

>>> newdf.show()
 ---------- ------ ---------- 
|      date|months|   newdate|
 ---------- ------ ---------- 
|2021-01-01|     0|2021-01-01|
|2021-02-01|     0|2021-02-01|
|2021-02-01|     1|2021-03-01|
|2021-02-01|     2|2021-04-01|
|2021-02-01|     3|2021-05-01|
|2021-02-01|     4|2021-06-01|
|2021-02-01|     5|2021-07-01|
|2021-02-01|     6|2021-08-01|
|2021-02-01|     7|2021-09-01|
|2021-02-01|     8|2021-10-01|
|2021-02-01|     9|2021-11-01|
|2021-02-01|    10|2021-12-01|
|2021-02-01|    11|2022-01-01|
|2022-02-01|     0|2022-02-01|
 ---------- ------ ---------- 
  • Related