Home > database >  PySpark filter using another dataframe having to and from date and group by using ids
PySpark filter using another dataframe having to and from date and group by using ids

Time:11-30

I have a dataframe date_dataframe in pyspark with Monthly frequency

date_dataframe

from_date, to_date
2021-01-01, 2022-01-01
2021-02-01, 2022-02-01
2021-03-01, 2022-03-01

Using the dataframe, I want to filter another dataframe having millions of records (daily frequency) by grouping them by id and aggregating to calculate average.

data_df

id,p_date,value
1, 2021-03-25, 10
1, 2021-03-26, 5
1, 2021-03-36, 7
2, 2021-03-25, 5
2, 2021-03-26, 7
2, 2021-03-36, 8
3, 2021-03-25, 20
3, 2021-03-26, 23
3, 2021-03-36, 17
.
.
.
10, 2022-03-25, 5
12, 2022-03-25, 6
  1. I want to use date_dataframe to query (filter) data_df
  2. Group by the filtered dataframe using ID
  3. Finally aggregate to calculate the average value.

I have tried the below code to do this.

from functools import reduce
from pyspark.sql import DataFrame

SeriesAppend=[]


for row in date_dataframe:

    df_new = data_df.filter((data_df.p_date >= row["from_date"]) & (data_df.p_date < row["to_date"])).groupBy("id").agg(min('p_date'), max('p_date'), F.avg('value') )    
    SeriesAppend.append(df_new)

df_series = reduce(DataFrame.unionAll, SeriesAppend)

Is there more optimized way to do this in pyspark without using for loop?

Also, date_dataframe is nothing but start of the month date as start date and end date is start date 1 year. I am okay with having different format for date_dataframe.

CodePudding user response:

You can use a sql function sequence to expand your ranges into actual date rows. Then you can use a join to complete the work. Here I changed the name of column to_date to end_date as to_date is a SQL function name and didn't want to deal with the hassle.

from pyspark.sql.functions import min, explode, col, expr
df_sequence = date_dataframe.select( \
  explode( \
   expr("sequence ( to_date(from_date),to_date(end_date), interval 1 day)").alias('day') ) )
df_sequence.join( data_df, data_df.p_date == df_sequence.day, "left")\
.groupby( blah, blah blah...

This should parallelize the work instead of using a for loop.

  • Related