I am executing this udf through pyspark on EMR and using spark 3.0.1 with yarn manager. How can I make this UDF faster?
I am using this external parser zipcodes. matching takes more time.
@udf(returnType=StringType())
def clean_zip(zip):
try:
if len(zipcodes.matching(zip_corrected))>0:
return zip_corrected
else:
return ""
except Exception as x:
print("Error Occured in zip udf, Error: " str(x))
CodePudding user response:
Used-defined functions tend to have lower performance than Spark functions (see Spark functions vs UDF performance?).
I suggest that you load the zip codes in a dataframe and do a lookup yourself by a SQL join. The zip dataframe has ~40K rows and one can afford that when working with "big data".
Here's a demo:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import zipcodes
df = spark.read.csv("MOCK_DATA.csv", header='true')
df.show(10)
# ------------ -------------- -------
# | city| state|zipcode|
# ------------ -------------- -------
# | Amarillo| Texas| 79188|
# | Vero Beach| Florida| 32969|
# | Charlotte|North Carolina| 28210|
# | Rochester| New York| 14639|
# | Arlington| Virginia| 22234|
# | San Antonio| Texas| 78260|
# | Austin| Texas| 78764|
# |Cedar Rapids| Iowa| 52410|
# | Dallas| Texas| 75216|
# | Raleigh|North Carolina| 27605|
# ------------ -------------- -------
@udf(returnType=StringType())
def clean_zip(zipcode):
try:
if len(zipcodes.matching(zipcode))>0:
return zipcode
else:
return ""
except Exception as x:
print("Error Occured in zip udf, Error: " str(x))
df.withColumn('zip_correct', clean_zip(df.zipcode)).show(10)
# ------------ -------------- ------- -----------
# | city| state|zipcode|zip_correct|
# ------------ -------------- ------- -----------
# | Amarillo| Texas| 79188| |
# | Vero Beach| Florida| 32969| 32969|
# | Charlotte|North Carolina| 28210| 28210|
# | Rochester| New York| 14639| 14639|
# | Arlington| Virginia| 22234| 22234|
# | San Antonio| Texas| 78260| 78260|
# | Austin| Texas| 78764| 78764|
# |Cedar Rapids| Iowa| 52410| 52410|
# | Dallas| Texas| 75216| 75216|
# | Raleigh|North Carolina| 27605| 27605|
# ------------ -------------- ------- -----------
Save zip codes table into dataframe zips
# data from mockaroo
# get zips.json.bz2 from the zipcodes repository
# sorting by zip_code should speed up lookup
zips = spark.read.json("zips.json.bz2").orderBy("zip_code", ascending=False)
zips.count()
# 42724
Left join
# left join
df.join(zips, df.zipcode==zips.zip_code, 'left') \
.select(df.city, df.state, df.zipcode, zips.zip_code.alias("zip_correct")) \
.show(10)
# ------------ -------------- ------- -----------
# | city| state|zipcode|zip_correct|
# ------------ -------------- ------- -----------
# | Amarillo| Texas| 79188| null |
# | Vero Beach| Florida| 32969| 32969|
# | Charlotte|North Carolina| 28210| 28210|
# | Rochester| New York| 14639| 14639|
# | Arlington| Virginia| 22234| 22234|
# | San Antonio| Texas| 78260| 78260|
# | Austin| Texas| 78764| 78764|
# |Cedar Rapids| Iowa| 52410| 52410|
# | Dallas| Texas| 75216| 75216|
# | Raleigh|North Carolina| 27605| 27605|
# ------------ -------------- ------- -----------