My dataframe :
My udf is below as:
@udf(returnType=StringType())
def clean_email(email):
try:
regex = r'\b[A-Za-z0-9._% -] @[A-Za-z0-9.-] \.[A-Z|a-z]{2,}\b'
replace={" ":"" , "//":"" ,"/":""}
for i in replace:
if i in email:
email=email.replace(i,"")
if email is not None or '.jpg' in email or email.startswith('http'):
if email.endswith('.') :
email=email[:len(email)-1]
return ''.join(e for e in email if (e.isalnum() or e in ['.', '@','-','_']))
else:
return ""
except Exception as x:
print("Error Occured in email udf, Error: " str(x))
Below code is used to compare 'expected' and 'curated' column
My program:
df1=context.spark.read.option("header",True).csv("./test/input/11-udf-test/Book1.csv",schema=schema)
df3=sorted(df1.select(col("expected"))).collect()
df2=df1.withColumn("Curated", dataclean.clean_email(col("email")))
df4=sorted(df2.select(col("Curated"))).collect()
assert df3== df4
Error:
test/udf_test.py:32: AssertionError
====================================================== short test summary info =======================================================
FAILED test/udf_test.py::test_upper - AssertionError: assert [Row(expected...xpected=None)] == [Row([email protected]')]
==================================================== 1 failed in
CodePudding user response:
The main thing that you didn't grasp is that Spark column (i.e. the result of the udf
) does not really "exist" until you add it to a dataframe and do an action with the dataframe. In your case, you don't have a dataframe, so you don't really run your udf
.
You could
- create a dataframe (
spark.range(1)
) - add a literal column (
.select ... lit('[email protected]')
) - run the
clean_email
udf on the column - extract the column's first row's value as string (
.head()[0]
) - and only then compare it to your expected result
from pyspark.sql import functions as F
assert spark.range(1).select(clean_email(F.lit('[email protected]'))).head()[0] == '[email protected]'
# False
Edit - since you showed how your dataframe looks.
Don't create separate dataframes for columns "ecpected" and "Curated", as you don't know if the order in both dataframes will be preserved. You could do something along these lines:
df1 = spark.createDataFrame(
[(' [email protected]', '[email protected]'),
('[email protected].', '[email protected]'),
('//[email protected]', '[email protected]'),
('httpsgmail.com', '')],
['email', 'expected'])
df2 = df1.withColumn("Curated", clean_email("email"))
for row in df2.collect():
assert row['expected'] == row['Curated']
AssertionError
You get the assertion error, because httpsgmail.com
is not changed by your udf
logic., i.e. "httpsgmail.com" != ""