Home > database >  pyspark groupby and apply a custom function
pyspark groupby and apply a custom function

Time:12-30

I have a custom function that works with pandas data frame groupby

def avg_df(df, weekss):
  """
  1. Get data frame and average calculation window
  2. Forward-rolling window starting from one year back data and calculate given time window average. eg: for 6th Dec 2021 to 6th Dec 2022 prediction, start 12 weeks rolling average starting from 6th Dec 2020 and rolls toward 6th Dec 2021.
  3. generate future date of the same length
  4. return the prepared data frame
  """
  future_date = []
  list1 = list(df.units)
  for i in range(1,54):
    avg = math.mean(list1[-(53   weekss)   i:])
    list1.append(avg)
  for i in range(53):
    future_date.append( date.today()   timedelta(days = 7 - date.today().weekday())   timedelta(weeks=i))
  data = pd.DataFrame({'date': list(df.date.dt.date)   future_date, 'units': list1})
  return data

it works when used in pandas as shown

df = df11.groupby(['customer_name','upc']).apply(avg_df, weekss=12).reset_index(inplace=False)

However, I need to change it to make it work with pyspark. I tried changes but it's not working.

passing parameter in apply with pyspark gives the following error.

TypeError: apply() got an unexpected keyword argument 'weekss'

I looked up for similar solution, this answer is too simple to be used in my case.

please use this for data frame generation

df = pd.DataFrame({'date':['2021-1-6','2021-3-13','2021-6-20','2021-10-27','2021-1-6','2021-3-13','2021-6-6','2021-10-6'],
                   'customer_name':['a1','a1','a1','a1','a1','a2','a2','a2'],
                   'upc':['b1','b1','b5','b5','b2','b2','b4','b4'],
                   'average_weekly_acv_distribution':[6,0,0,7,2,9,3,8],
                   'units':[8,0,0,8,1,9,3,8]})
df['date'] = pd.to_datetime(df['date'])
df = spark.createDataFrame(df)

I looked up applyInPandas() for pyspark but it doesn't allow any argument.

CodePudding user response:

First, we need to define the schema of the output custom function

schema = StructType([ \
    StructField("units", IntegerType(), True), \
    StructField("date", DateType(), True), \
    StructField("upc", StringType(), True), \
    StructField("customer_name", StringType(), True), \
  ])

and updating the custom function

def avg_df_12_weeks(df: pd.DataFrame )-> pd.DataFrame:
  weekss = 12
  upc =str(df["upc"].iloc[0])
  customer_name = str(df["customer_name"].iloc[0])
  future_date = []
  list1 = list(df.units)
  for i in range(1,54):
    avg = mean(list1[-(53   weekss)   i:])
    list1.append(avg)
  for i in range(53):
    future_date.append( date.today()   timedelta(days = 7 - date.today().weekday())   timedelta(weeks=i))
  df = pd.DataFrame({'date': list(df.date.dt.date)   future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc})
  return df

then finally groupBy, applyInPandas and pass the schema as parameter

df_sales_grouped.groupBy('customer_name','upc').applyInPandas(avg_df_12_weeks, schema = schema)

Drawback: Not allowed to pass parameters to the custom function

CodePudding user response:

Building on top of the answer from @DileepKumar, the avg_df can be partially applied using partial passing in the weekss param. The result function accepts only the dataframe and can be used in applyInPandas.

from pyspark.sql.types import *

schema = StructType([ \
    StructField("units", IntegerType(), True), \
    StructField("date", DateType(), True), \
    StructField("upc", StringType(), True), \
    StructField("customer_name", StringType(), True), \
  ])

import statistics as math
from datetime import date, timedelta

def avg_df(df: pd.DataFrame, weekss) -> pd.DataFrame:
    upc = str(df["upc"].iloc[0])
    customer_name = str(df["customer_name"].iloc[0])
    future_date = []
    list1 = list(df.units)
    for i in range(1, 54):
        avg = math.mean(list1[-(53   weekss)   i:])
        list1.append(avg)
    for i in range(53):
        future_date.append(date.today()   timedelta(days=7 - date.today().weekday())   timedelta(weeks=i))
    df = pd.DataFrame(
        {'date': list(df.date.dt.date)   future_date, 'units': list1, 'customer_name': customer_name, 'upc': upc})
    return df

from functools import partial

df.groupBy('customer_name','upc').applyInPandas(partial(avg_df, weekss = 12), schema = schema).show()

"""
 ----- ---------- --- ------------- 
|units|      date|upc|customer_name|
 ----- ---------- --- ------------- 
|    8|2021-01-06| b1|           a1|
|    0|2021-03-13| b1|           a1|
|    4|2022-01-03| b1|           a1|
|    4|2022-01-10| b1|           a1|
|    4|2022-01-17| b1|           a1|
|    4|2022-01-24| b1|           a1|
|    4|2022-01-31| b1|           a1|
|    4|2022-02-07| b1|           a1|
|    4|2022-02-14| b1|           a1|
|    4|2022-02-21| b1|           a1|
|    4|2022-02-28| b1|           a1|
|    4|2022-03-07| b1|           a1|
|    4|2022-03-14| b1|           a1|
|    4|2022-03-21| b1|           a1|
|    4|2022-03-28| b1|           a1|
|    4|2022-04-04| b1|           a1|
|    4|2022-04-11| b1|           a1|
|    4|2022-04-18| b1|           a1|
|    4|2022-04-25| b1|           a1|
|    4|2022-05-02| b1|           a1|
 ----- ---------- --- ------------- 
only showing top 20 rows
"""
  • Related