Home > Net >  How to make this pyspark udf faster?
How to make this pyspark udf faster?

Time:09-05

I am executing this udf through pyspark on EMR and using spark 3.0.1 with yarn manager. How can I make this UDF faster?

I am using this external parser zipcodes. matching takes more time.

@udf(returnType=StringType())
def clean_zip(zip): 
    try:
   
    if len(zipcodes.matching(zip_corrected))>0:
        return   zip_corrected  
    else:
        return ""   

except Exception as x:  
    print("Error Occured in zip udf, Error: "   str(x))

CodePudding user response:

Used-defined functions tend to have lower performance than Spark functions (see Spark functions vs UDF performance?).

I suggest that you load the zip codes in a dataframe and do a lookup yourself by a SQL join. The zip dataframe has ~40K rows and one can afford that when working with "big data".

Here's a demo:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import zipcodes

df = spark.read.csv("MOCK_DATA.csv", header='true')
df.show(10)
#  ------------ -------------- ------- 
# |        city|         state|zipcode|
#  ------------ -------------- ------- 
# |    Amarillo|         Texas|  79188|
# |  Vero Beach|       Florida|  32969|
# |   Charlotte|North Carolina|  28210|
# |   Rochester|      New York|  14639|
# |   Arlington|      Virginia|  22234|
# | San Antonio|         Texas|  78260|
# |      Austin|         Texas|  78764|
# |Cedar Rapids|          Iowa|  52410|
# |      Dallas|         Texas|  75216|
# |     Raleigh|North Carolina|  27605|
#  ------------ -------------- ------- 

@udf(returnType=StringType())
def clean_zip(zipcode): 
  try:
    if len(zipcodes.matching(zipcode))>0:
      return zipcode
    else:
      return ""
  except Exception as x:
    print("Error Occured in zip udf, Error: "   str(x))


df.withColumn('zip_correct', clean_zip(df.zipcode)).show(10)
#  ------------ -------------- ------- ----------- 
# |        city|         state|zipcode|zip_correct|
#  ------------ -------------- ------- ----------- 
# |    Amarillo|         Texas|  79188|           |
# |  Vero Beach|       Florida|  32969|      32969|
# |   Charlotte|North Carolina|  28210|      28210|
# |   Rochester|      New York|  14639|      14639|
# |   Arlington|      Virginia|  22234|      22234|
# | San Antonio|         Texas|  78260|      78260|
# |      Austin|         Texas|  78764|      78764|
# |Cedar Rapids|          Iowa|  52410|      52410|
# |      Dallas|         Texas|  75216|      75216|
# |     Raleigh|North Carolina|  27605|      27605|
#  ------------ -------------- ------- ----------- 

Save zip codes table into dataframe zips

# data from mockaroo
# get zips.json.bz2 from the zipcodes repository
# sorting by zip_code should speed up lookup
zips = spark.read.json("zips.json.bz2").orderBy("zip_code", ascending=False)
zips.count()
# 42724

Left join

# left join
df.join(zips, df.zipcode==zips.zip_code, 'left') \
  .select(df.city, df.state, df.zipcode, zips.zip_code.alias("zip_correct")) \
  .show(10)
#  ------------ -------------- ------- -----------                                
# |        city|         state|zipcode|zip_correct|
#  ------------ -------------- ------- ----------- 
# |    Amarillo|         Texas|  79188|      null | 
# |  Vero Beach|       Florida|  32969|      32969|
# |   Charlotte|North Carolina|  28210|      28210|
# |   Rochester|      New York|  14639|      14639|
# |   Arlington|      Virginia|  22234|      22234|
# | San Antonio|         Texas|  78260|      78260|
# |      Austin|         Texas|  78764|      78764|
# |Cedar Rapids|          Iowa|  52410|      52410|
# |      Dallas|         Texas|  75216|      75216|
# |     Raleigh|North Carolina|  27605|      27605|
#  ------------ -------------- ------- ----------- 
  • Related