Home > Blockchain >  How to mock this PySpark udf?
How to mock this PySpark udf?

Time:09-06

My dataframe :

enter image description here

My udf is below as:

@udf(returnType=StringType())
def clean_email(email): 
    try:
        regex = r'\b[A-Za-z0-9._% -] @[A-Za-z0-9.-] \.[A-Z|a-z]{2,}\b' 
        replace={" ":"" ,  "//":"" ,"/":""}
        for i in replace:
             if i in email:
                 email=email.replace(i,"")
        if email is not None or '.jpg' in email or email.startswith('http'):    
            if email.endswith('.') :
                email=email[:len(email)-1]      
            return ''.join(e for e in email if (e.isalnum() or e in ['.', '@','-','_']))                       
        else:
            return ""
    except Exception as x:
         print("Error Occured in email udf, Error: "   str(x))  

Below code is used to compare 'expected' and 'curated' column

My program:

df1=context.spark.read.option("header",True).csv("./test/input/11-udf-test/Book1.csv",schema=schema) 
        df3=sorted(df1.select(col("expected"))).collect()  
        df2=df1.withColumn("Curated", dataclean.clean_email(col("email")))
        df4=sorted(df2.select(col("Curated"))).collect()
        assert df3== df4

Error:

test/udf_test.py:32: AssertionError
    ====================================================== short test summary info =======================================================
    FAILED test/udf_test.py::test_upper - AssertionError: assert [Row(expected...xpected=None)] == [Row([email protected]')]
    ==================================================== 1 failed in 

CodePudding user response:

The main thing that you didn't grasp is that Spark column (i.e. the result of the udf) does not really "exist" until you add it to a dataframe and do an action with the dataframe. In your case, you don't have a dataframe, so you don't really run your udf.

You could

  1. create a dataframe (spark.range(1))
  2. add a literal column (.select ... lit('[email protected]'))
  3. run the clean_email udf on the column
  4. extract the column's first row's value as string (.head()[0])
  5. and only then compare it to your expected result
from pyspark.sql import functions as F
assert spark.range(1).select(clean_email(F.lit('[email protected]'))).head()[0] == '[email protected]'
# False

Edit - since you showed how your dataframe looks.

Don't create separate dataframes for columns "ecpected" and "Curated", as you don't know if the order in both dataframes will be preserved. You could do something along these lines:

df1 = spark.createDataFrame(
    [(' [email protected]', '[email protected]'),
     ('[email protected].', '[email protected]'),
     ('//[email protected]', '[email protected]'),
     ('httpsgmail.com', '')],
    ['email', 'expected'])
df2 = df1.withColumn("Curated", clean_email("email"))
for row in df2.collect():
    assert row['expected'] == row['Curated']

AssertionError

You get the assertion error, because httpsgmail.com is not changed by your udf logic., i.e. "httpsgmail.com" != ""

  • Related