Home > database >  PySpark: How to get range of dates from dataframe into a new dataframe
PySpark: How to get range of dates from dataframe into a new dataframe

Time:12-04

I have this PySpark data frame with a single row:

spark_session_tbl_df.printSchema()

spark_session_tbl_df.show()   


root
 |-- strm: string (nullable = true)
 |-- acad_career: string (nullable = true)
 |-- session_code: string (nullable = true)
 |-- sess_begin_dt: timestamp (nullable = true)
 |-- sess_end_dt: timestamp (nullable = true)
 |-- census_dt: timestamp (nullable = true)

 ---- ----------- ------------ ------------------- ------------------- ------------------- 
|strm|acad_career|session_code|      sess_begin_dt|        sess_end_dt|          census_dt|
 ---- ----------- ------------ ------------------- ------------------- ------------------- 
|2228|       UGRD|           1|2022-08-20 00:00:00|2022-12-03 00:00:00|2022-09-19 00:00:00|
 ---- ----------- ------------ ------------------- ------------------- ------------------- 

I am trying to output something like this where each row is a range/sequence of 7 days:

 ------------------- ------------------- 
|      sess_begin_dt|        sess_end_dt|         
 ------------------- ------------------- 
|2022-08-20         |2022-08-27         |
 ------------------- ------------------- 
|2022-08-28         |2022-09-04         |
 ---- -------------- ------------------- 
|2022-09-05         |2022-09-12         |
 ------------------- ------------------- 
|2022-09-13         |2022-09-20         |
 ---- -------------- ------------------- 
|2022-09-21         |2022-09-28         |
 ------------------- ------------------- 
               .....
 ------------------- ------------------- 
|2022-11-26         |2022-12-03         |
 ---- -------------- ------------------- 

I tried this below, but I am not sure if this can reference the PySpark data frame or I will need to do another approach to achieve the desire output above.

from pyspark.sql.functions import sequence, to_date, explode, col

    date_range_df = spark.sql("SELECT sequence(to_date('sess_begin_dt'), to_date('sess_end_dt'), interval 7 day) as date").withColumn("date", explode(col("date")))
    date_range_df.show()

CodePudding user response:

One of the approaches when you are dealing with timeseries is to convert date to timestamp and solve the question in a numerical way and the end convert it to date again.

from pyspark.sql import functions as F

data = [['2022-08-20 00:00:00', '2022-12-03 00:00:00']]
df = spark.createDataFrame(data = data, schema = ['start', 'end'])

week_seconds = 7*24*60*60
(
    df
    .withColumn('start_timestamp', F.unix_timestamp('start'))
    .withColumn('end_timestamp', F.unix_timestamp('end'))
    .select(
        F.explode(
            F.sequence('start_timestamp', 'end_timestamp', F.lit(week_seconds)))
        .alias('start_date'))
    .withColumn('start_date', F.to_date(F.from_unixtime('start_date')))
    .withColumn('end_date', F.date_add('start_date', 6))
).show()

 ---------- ---------- 
|start_date|  end_date|
 ---------- ---------- 
|2022-08-20|2022-08-26|
|2022-08-27|2022-09-02|
|2022-09-03|2022-09-09|
|2022-09-10|2022-09-16|
|2022-09-17|2022-09-23|
 ---------- ---------- 
  • Related