I have a UDF that compares two strings str_left
and str_right
, but fails if either is a null.
I thought it should be possible to 'protect' the udf with a case expression as follows:
select
case
when str_left is null or str_right is null then -1
else my_udf(str_left, str_right)
end as my_col
from my_table
But this fails in practice. Why does this not work?
Here is a complete example in pyspark, which produces the error TypeError: object of type 'NoneType' has no len()
in Spark 2.4.3 and Spark 3.1.2.
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql import Row
def rel_length(str1, str2):
return len(str1)/len(str2)
spark.udf.register("rel_length_py", rel_length, DoubleType())
rows = [
{"str_col_l": "a string", "str_col_r": "another string"},
{"str_col_l": "a string", "str_col_r": None},
]
df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")
sql = """select
case
when str_col_r is null or str_col_l is null then -1
else rel_length_py(str_col_l, str_col_r)
end
as rel
from str_comp
"""
spark.sql(sql).show()
I've tried to simplify this down to the reproducible example above. The 'real world' problem we're encountering is a similar case statement with this udf. Here's a gist with the code that produces the error. Strangley, in this more complex example, it fails in spark 3.1.2 but succeeds in 2.4.3.
CodePudding user response:
In many situations, the Spark optimiser will execute ALL parts of your case expression, even though some appear to be unreachable.
In the example given in the question, we can show that Spark executes BOTH:
when str_col_r is null or str_col_l is null then -1
AND
else rel_length_py(str_col_l, str_col_r)
even in cases where str_col_r is null
or str_col_l is null
Here is some example code. The dataframe is as follows, where the second row is repeated 100 time.
| str_col_l | str_col_r |
|:------------|:------------|
| a | b |
| a string | null |
| a string | null |
| a string | null |
...96 repeats...
| a string | null |
I have set:
conf.set("spark.sql.shuffle.partitions", "1")
conf.set("spark.default.parallelism", "1")
We run a UDF that sleeps for 1 second whenever it is executed:
%%time
def rel_length(str1, str2):
time.sleep(1)
if str1 is None or str2 is None:
return -0.9
return len(str1)/len(str2)
spark.udf.register("rel_length_py", rel_length, DoubleType())
rows = [{"str_col_l": "a", "str_col_r": "b"}] [{"str_col_l": "a string", "str_col_r": None}]*100
df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")
sql = """select
case
when str_col_r is null or str_col_l is null then -1
else rel_length_py(str_col_l, str_col_r)
end
as rel
from str_comp
"""
spark.sql(sql).toPandas().head(2)
CPU times: user 183 ms, sys: 61.8 ms, total: 245 ms
Wall time: 1min 46s
i.e. around 100 seconds.
Here the sleep statement is repositioned in the UDF so it sleeps for 1 second ONLY for the first row.
%%time
def rel_length(str1, str2):
if str1 is None or str2 is None:
return -0.9
time.sleep(1)
return len(str1)/len(str2)
spark.udf.register("rel_length_py", rel_length, DoubleType())
rows = [{"str_col_l": "a", "str_col_r": "b"}] [{"str_col_l": "a string", "str_col_r": None}]*100
df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")
sql = """select
case
when str_col_r is null or str_col_l is null then -1
else rel_length_py(str_col_l, str_col_r)
end
as rel
from str_comp
"""
spark.sql(sql).toPandas().head(2)
CPU times: user 14.5 ms, sys: 6.42 ms, total: 20.9 ms
Wall time: 1.36 s
This proves that at least in some instances all parts of the case statement will execute. I don't believe that all parts are guaranteed to execute, because I've seen working examples that would error if all parts were being executed.
CodePudding user response:
It's funny that for some reason, spark executes the udf (at least for some rows) even if it does not use the result. Sometimes catalyst manages not to do it, like when the column is generated with lit(None)
but most of the time it does.
The easiest way to solve this would be to modify your udf to handle that case:
def rel_lenght(str1, str2):
if str1 is None or str2 is None:
return -1
else:
len(str1) / len(str2)