I have a custom function that works with pandas data frame groupby
def avg_df(df, weekss):
"""
1. Get data frame and average calculation window
2. Forward-rolling window starting from one year back data and calculate given time window average. eg: for 6th Dec 2021 to 6th Dec 2022 prediction, start 12 weeks rolling average starting from 6th Dec 2020 and rolls toward 6th Dec 2021.
3. generate future date of the same length
4. return the prepared data frame
"""
future_date = []
list1 = list(df.units)
for i in range(1,54):
avg = math.mean(list1[-(53 weekss) i:])
list1.append(avg)
for i in range(53):
future_date.append( date.today() timedelta(days = 7 - date.today().weekday()) timedelta(weeks=i))
data = pd.DataFrame({'date': list(df.date.dt.date) future_date, 'units': list1})
return data
it works when used in pandas as shown
df = df11.groupby(['customer_name','upc']).apply(avg_df, weekss=12).reset_index(inplace=False)
However, I need to change it to make it work with pyspark. I tried changes but it's not working.
passing parameter in apply with pyspark gives the following error.
TypeError: apply() got an unexpected keyword argument 'weekss'
I looked up for similar solution, this answer is too simple to be used in my case.
please use this for data frame generation
df = pd.DataFrame({'date':['2021-1-6','2021-3-13','2021-6-20','2021-10-27','2021-1-6','2021-3-13','2021-6-6','2021-10-6'],
'customer_name':['a1','a1','a1','a1','a1','a2','a2','a2'],
'upc':['b1','b1','b5','b5','b2','b2','b4','b4'],
'average_weekly_acv_distribution':[6,0,0,7,2,9,3,8],
'units':[8,0,0,8,1,9,3,8]})
df['date'] = pd.to_datetime(df['date'])
df = spark.createDataFrame(df)
I looked up applyInPandas() for pyspark but it doesn't allow any argument.
CodePudding user response:
First, we need to define the schema of the output custom function
schema = StructType([ \
StructField("units", IntegerType(), True), \
StructField("date", DateType(), True), \
StructField("upc", StringType(), True), \
StructField("customer_name", StringType(), True), \
])
and updating the custom function
def avg_df_12_weeks(df: pd.DataFrame )-> pd.DataFrame:
weekss = 12
upc =str(df["upc"].iloc[0])
customer_name = str(df["customer_name"].iloc[0])
future_date = []
list1 = list(df.units)
for i in range(1,54):
avg = mean(list1[-(53 weekss) i:])
list1.append(avg)
for i in range(53):
future_date.append( date.today() timedelta(days = 7 - date.today().weekday()) timedelta(weeks=i))
df = pd.DataFrame({'date': list(df.date.dt.date) future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc})
return df
then finally groupBy, applyInPandas and pass the schema as parameter
df_sales_grouped.groupBy('customer_name','upc').applyInPandas(avg_df_12_weeks, schema = schema)
Drawback: Not allowed to pass parameters to the custom function
CodePudding user response:
Building on top of the answer from @DileepKumar, the avg_df
can be partially applied using partial
passing in the weekss
param. The result function accepts only the dataframe
and can be used in applyInPandas
.
from pyspark.sql.types import *
schema = StructType([ \
StructField("units", IntegerType(), True), \
StructField("date", DateType(), True), \
StructField("upc", StringType(), True), \
StructField("customer_name", StringType(), True), \
])
import statistics as math
from datetime import date, timedelta
def avg_df(df: pd.DataFrame, weekss) -> pd.DataFrame:
upc = str(df["upc"].iloc[0])
customer_name = str(df["customer_name"].iloc[0])
future_date = []
list1 = list(df.units)
for i in range(1, 54):
avg = math.mean(list1[-(53 weekss) i:])
list1.append(avg)
for i in range(53):
future_date.append(date.today() timedelta(days=7 - date.today().weekday()) timedelta(weeks=i))
df = pd.DataFrame(
{'date': list(df.date.dt.date) future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc})
return df
from functools import partial
df.groupBy('customer_name','upc').applyInPandas(partial(avg_df, weekss = 12), schema = schema).show()
"""
----- ---------- --- -------------
|units| date|upc|customer_name|
----- ---------- --- -------------
| 8|2021-01-06| b1| a1|
| 0|2021-03-13| b1| a1|
| 4|2022-01-03| b1| a1|
| 4|2022-01-10| b1| a1|
| 4|2022-01-17| b1| a1|
| 4|2022-01-24| b1| a1|
| 4|2022-01-31| b1| a1|
| 4|2022-02-07| b1| a1|
| 4|2022-02-14| b1| a1|
| 4|2022-02-21| b1| a1|
| 4|2022-02-28| b1| a1|
| 4|2022-03-07| b1| a1|
| 4|2022-03-14| b1| a1|
| 4|2022-03-21| b1| a1|
| 4|2022-03-28| b1| a1|
| 4|2022-04-04| b1| a1|
| 4|2022-04-11| b1| a1|
| 4|2022-04-18| b1| a1|
| 4|2022-04-25| b1| a1|
| 4|2022-05-02| b1| a1|
----- ---------- --- -------------
only showing top 20 rows
"""