I'm trying to learn to use pandas_udf
in pyspark (Databricks).
One of the assignments is to write a pandas_udf
to sort by day of the week. I know how to do this using spark udf:
from pyspark.sql.functions import *
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()
@udf()
def udf(day: str) -> str:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day] '-' day
print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()
Prints:
Original
--- -----------
|day| avg_users|
--- -----------
|Sun| 282905.5|
|Mon| 238195.5|
|Thu| 264620.0|
|Sat| 278482.0|
|Wed| 227214.0|
--- -----------
with spark udf
----------- -----
| avg_users| day|
----------- -----
| 238195.5|1-Mon|
| 227214.0|3-Wed|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 282905.5|7-Sun|
----------- -----
Trying to do the same with pandas_udf
import pandas as pd
@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day.str] '-' day.str
p_final_df = df.select(df.avg_users, p_udf(df.day))
print('with pandas udf')
p_final_df.show()
I get KeyError: <pandas.core.strings.accessor.StringMethods object at 0x7f31197cd9a0>
. I think it's coming from dow[day.str]
, which kinda makes sense.
I also tried:
return dow[day.str.__str__()] '-' day.str # KeyError: .... StringMethods
return dow[str(day.str)] '-' day.str # KeyError: .... StringMethods
return dow[day.str.upper()] '-' day.str # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}" # KeyError: .... StringMethods (but I think this is logically
# wrong, returning a string instead of a Series)
I've read:
- API reference
- PySpark equivalent for lambda function in Pandas UDF
- How to convert Scalar Pyspark UDF to Pandas UDF?
- Pandas UDF in pyspark
CodePudding user response:
what about we return a dataframe using groupeddata and orderby after you do the udf. Pandas sort_values
is quite problematic within udfs.
Basically, in the udf I generate the numbers using python and then concatenate them back to the day column.
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
day=pdf.day
pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7)))) 1).astype(str) '-' day)
return pdf
df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()
----- ---------
| day|avg_users|
----- ---------
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
----- ---------
CodePudding user response:
Using the .str
method alone without any actual vectorized transformation was giving you the error. Also, you can not use the whole series as a key for your dow
dict. Use a map
method for pandas.Series
:
from pyspark.sql.functions import *
import pandas as pd
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return day.map(dow) '-' day
df.select(df.avg_users, p_udf(df.day).alias("day")).show()
--------- -----
|avg_users| day|
--------- -----
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
--------- -----