Home > Mobile >  Create PySpark dataframe with timeseries column
Create PySpark dataframe with timeseries column

Time:05-04

I have an initial PySpark dataframe from which I would like to take the MIN and MAX from a date column and then create a new PySpark dataframe with a timeseries (daily date), using the MIN and MAX from my initial dataframe.

I will use it to then join with my initial dataframe and find missing days (null in the rest of the column of my inital DF).

I tried in many different ways to build the timeseries DF, but it doesn't seem to work in PySpark. Any suggestions?

CodePudding user response:

Max column's value can be extracted like this:

df.groupBy().agg(F.max('col_name')).head()[0]

Date range df can be created like this:

df2 = spark.sql("SELECT sequence(to_date('2000-01-01'), to_date('2000-02-02'), interval 1 day) as date_col").withColumn('date_col', F.explode('date_col'))

And then join.


Full example:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, '2022-04-01'),(2, '2022-04-05')], ['id', 'df1_date']).select('id', F.col('df1_date').cast('date'))
df1.show()
#  --- ---------- 
# | id|  df1_date|
#  --- ---------- 
# |  1|2022-04-01|
# |  2|2022-04-05|
#  --- ---------- 

min_date = df1.groupBy().agg(F.min('df1_date')).head()[0]
max_date = df1.groupBy().agg(F.max('df1_date')).head()[0]

df2 = spark.sql(f"SELECT sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day) as df2_date").withColumn('df2_date', F.explode('df2_date'))

df3 = df2.join(df1, df1.df1_date == df2.df2_date, 'left')

df3.show()
#  ---------- ---- ---------- 
# |  df2_date|  id|  df1_date|
#  ---------- ---- ---------- 
# |2022-04-01|   1|2022-04-01|
# |2022-04-02|null|      null|
# |2022-04-03|null|      null|
# |2022-04-04|null|      null|
# |2022-04-05|   2|2022-04-05|
#  ---------- ---- ---------- 
  • Related