I have a dataframe date_dataframe
in pyspark with Monthly frequency
date_dataframe
from_date, to_date
2021-01-01, 2022-01-01
2021-02-01, 2022-02-01
2021-03-01, 2022-03-01
Using the dataframe, I want to filter another dataframe having millions of records (daily frequency) by grouping them by id and aggregating to calculate average.
data_df
id,p_date,value
1, 2021-03-25, 10
1, 2021-03-26, 5
1, 2021-03-36, 7
2, 2021-03-25, 5
2, 2021-03-26, 7
2, 2021-03-36, 8
3, 2021-03-25, 20
3, 2021-03-26, 23
3, 2021-03-36, 17
.
.
.
10, 2022-03-25, 5
12, 2022-03-25, 6
- I want to use
date_dataframe
to query (filter)data_df
- Group by the filtered dataframe using ID
- Finally aggregate to calculate the average value.
I have tried the below code to do this.
from functools import reduce
from pyspark.sql import DataFrame
SeriesAppend=[]
for row in date_dataframe:
df_new = data_df.filter((data_df.p_date >= row["from_date"]) & (data_df.p_date < row["to_date"])).groupBy("id").agg(min('p_date'), max('p_date'), F.avg('value') )
SeriesAppend.append(df_new)
df_series = reduce(DataFrame.unionAll, SeriesAppend)
Is there more optimized way to do this in pyspark without using for loop?
Also, date_dataframe
is nothing but start of the month date as start date and end date is start date 1 year. I am okay with having different format for date_dataframe
.
CodePudding user response:
You can use a sql function sequence
to expand your ranges into actual date rows. Then you can use a join to complete the work. Here I changed the name of column to_date
to end_date
as to_date
is a SQL function name and didn't want to deal with the hassle.
from pyspark.sql.functions import min, explode, col, expr
df_sequence = date_dataframe.select( \
explode( \
expr("sequence ( to_date(from_date),to_date(end_date), interval 1 day)").alias('day') ) )
df_sequence.join( data_df, data_df.p_date == df_sequence.day, "left")\
.groupby( blah, blah blah...
This should parallelize the work instead of using a for loop.