I have this PySpark data frame with a single row:
spark_session_tbl_df.printSchema()
spark_session_tbl_df.show()
root
|-- strm: string (nullable = true)
|-- acad_career: string (nullable = true)
|-- session_code: string (nullable = true)
|-- sess_begin_dt: timestamp (nullable = true)
|-- sess_end_dt: timestamp (nullable = true)
|-- census_dt: timestamp (nullable = true)
---- ----------- ------------ ------------------- ------------------- -------------------
|strm|acad_career|session_code| sess_begin_dt| sess_end_dt| census_dt|
---- ----------- ------------ ------------------- ------------------- -------------------
|2228| UGRD| 1|2022-08-20 00:00:00|2022-12-03 00:00:00|2022-09-19 00:00:00|
---- ----------- ------------ ------------------- ------------------- -------------------
I am trying to output something like this where each row is a range/sequence of 7 days:
------------------- -------------------
| sess_begin_dt| sess_end_dt|
------------------- -------------------
|2022-08-20 |2022-08-27 |
------------------- -------------------
|2022-08-28 |2022-09-04 |
---- -------------- -------------------
|2022-09-05 |2022-09-12 |
------------------- -------------------
|2022-09-13 |2022-09-20 |
---- -------------- -------------------
|2022-09-21 |2022-09-28 |
------------------- -------------------
.....
------------------- -------------------
|2022-11-26 |2022-12-03 |
---- -------------- -------------------
I tried this below, but I am not sure if this can reference the PySpark data frame or I will need to do another approach to achieve the desire output above.
from pyspark.sql.functions import sequence, to_date, explode, col
date_range_df = spark.sql("SELECT sequence(to_date('sess_begin_dt'), to_date('sess_end_dt'), interval 7 day) as date").withColumn("date", explode(col("date")))
date_range_df.show()
CodePudding user response:
One of the approaches when you are dealing with timeseries is to convert date to timestamp and solve the question in a numerical way and the end convert it to date again.
from pyspark.sql import functions as F
data = [['2022-08-20 00:00:00', '2022-12-03 00:00:00']]
df = spark.createDataFrame(data = data, schema = ['start', 'end'])
week_seconds = 7*24*60*60
(
df
.withColumn('start_timestamp', F.unix_timestamp('start'))
.withColumn('end_timestamp', F.unix_timestamp('end'))
.select(
F.explode(
F.sequence('start_timestamp', 'end_timestamp', F.lit(week_seconds)))
.alias('start_date'))
.withColumn('start_date', F.to_date(F.from_unixtime('start_date')))
.withColumn('end_date', F.date_add('start_date', 6))
).show()
---------- ----------
|start_date| end_date|
---------- ----------
|2022-08-20|2022-08-26|
|2022-08-27|2022-09-02|
|2022-09-03|2022-09-09|
|2022-09-10|2022-09-16|
|2022-09-17|2022-09-23|
---------- ----------