Home > Blockchain >  Vectorized pandas udf in pyspark with dict lookup
Vectorized pandas udf in pyspark with dict lookup

Time:03-19

I'm trying to learn to use pandas_udf in pyspark (Databricks).

One of the assignments is to write a pandas_udf to sort by day of the week. I know how to do this using spark udf:

from pyspark.sql.functions import *

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()


@udf()
def udf(day: str) -> str:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day]   '-'   day

print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()

Prints:

Original
 --- ----------- 
|day|  avg_users|
 --- ----------- 
|Sun|   282905.5|
|Mon|   238195.5|
|Thu|   264620.0|
|Sat|   278482.0|
|Wed|   227214.0|
 --- ----------- 

with spark udf
 ----------- ----- 
|  avg_users|  day|
 ----------- ----- 
|   238195.5|1-Mon|
|   227214.0|3-Wed|
|   264620.0|4-Thu|
|   278482.0|6-Sat|
|   282905.5|7-Sun|
 ----------- ----- 

Trying to do the same with pandas_udf

import pandas as pd


@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day.str]   '-'   day.str


p_final_df = df.select(df.avg_users, p_udf(df.day))

print('with pandas udf')
p_final_df.show()

I get KeyError: <pandas.core.strings.accessor.StringMethods object at 0x7f31197cd9a0>. I think it's coming from dow[day.str], which kinda makes sense.

I also tried:

return dow[day.str.__str__()]   '-'   day.str # KeyError: .... StringMethods
return dow[str(day.str)]   '-'   day.str      # KeyError: .... StringMethods
return dow[day.str.upper()]   '-'   day.str   # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}"            # KeyError: .... StringMethods (but I think this is logically
                                              # wrong, returning a string instead of a Series)

I've read:

CodePudding user response:

what about we return a dataframe using groupeddata and orderby after you do the udf. Pandas sort_values is quite problematic within udfs.

Basically, in the udf I generate the numbers using python and then concatenate them back to the day column.

from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
  day=pdf.day
  pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7)))) 1).astype(str)   '-' day)
 
  return pdf

df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()

 ----- --------- 
|  day|avg_users|
 ----- --------- 
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
 ----- --------- 

CodePudding user response:

Using the .str method alone without any actual vectorized transformation was giving you the error. Also, you can not use the whole series as a key for your dow dict. Use a map method for pandas.Series:

from pyspark.sql.functions import *
import pandas as pd

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)

@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return day.map(dow)   '-'   day

df.select(df.avg_users, p_udf(df.day).alias("day")).show()

 --------- ----- 
|avg_users|  day|
 --------- ----- 
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
 --------- ----- 
  • Related