Home > database >  Spark: Using null checking in a CASE WHEN expression to protect against type errors
Spark: Using null checking in a CASE WHEN expression to protect against type errors

Time:10-26

I have a UDF that compares two strings str_left and str_right, but fails if either is a null.

I thought it should be possible to 'protect' the udf with a case expression as follows:

select 
    case 
    when str_left is null or str_right is null then -1 
    else my_udf(str_left, str_right)
    end as my_col
from my_table

But this fails in practice. Why does this not work?

Here is a complete example in pyspark, which produces the error TypeError: object of type 'NoneType' has no len() in Spark 2.4.3 and Spark 3.1.2.

from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql import Row

def rel_length(str1, str2):   
    return len(str1)/len(str2)

spark.udf.register("rel_length_py", rel_length, DoubleType())

rows = [
    {"str_col_l": "a string", "str_col_r": "another string"},
    {"str_col_l": "a string", "str_col_r": None},

]
df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")

sql = """select 
    case
    when str_col_r is null or str_col_l is null  then -1
    else rel_length_py(str_col_l, str_col_r) 
    end
    as rel
from str_comp
"""
spark.sql(sql).show()

I've tried to simplify this down to the reproducible example above. The 'real world' problem we're encountering is a similar case statement with this udf. Here's a gist with the code that produces the error. Strangley, in this more complex example, it fails in spark 3.1.2 but succeeds in 2.4.3.

CodePudding user response:

In many situations, the Spark optimiser will execute ALL parts of your case expression, even though some appear to be unreachable.

In the example given in the question, we can show that Spark executes BOTH:

when str_col_r is null or str_col_l is null  then -1

AND

else rel_length_py(str_col_l, str_col_r) 

even in cases where str_col_r is null or str_col_l is null

Here is some example code. The dataframe is as follows, where the second row is repeated 100 time.


| str_col_l   | str_col_r   |
|:------------|:------------|
| a           | b           |
| a string    | null        |
| a string    | null        |
| a string    | null        |
...96 repeats...
| a string    | null        |

I have set:

conf.set("spark.sql.shuffle.partitions", "1")
conf.set("spark.default.parallelism", "1")

We run a UDF that sleeps for 1 second whenever it is executed:

%%time

def rel_length(str1, str2):   
    time.sleep(1)
    if str1 is None or str2 is None:
        return -0.9

    return len(str1)/len(str2)

spark.udf.register("rel_length_py", rel_length, DoubleType())

rows = [{"str_col_l": "a", "str_col_r": "b"}]    [{"str_col_l": "a string", "str_col_r": None}]*100

df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")

sql = """select 
    case
    when str_col_r is null or str_col_l is null  then -1
    else rel_length_py(str_col_l, str_col_r) 
    end
    as rel
from str_comp
"""
spark.sql(sql).toPandas().head(2)
CPU times: user 183 ms, sys: 61.8 ms, total: 245 ms
Wall time: 1min 46s

i.e. around 100 seconds.

Here the sleep statement is repositioned in the UDF so it sleeps for 1 second ONLY for the first row.

%%time

def rel_length(str1, str2):   
    if str1 is None or str2 is None:
        return -0.9
    time.sleep(1)
    return len(str1)/len(str2)

spark.udf.register("rel_length_py", rel_length, DoubleType())

rows = [{"str_col_l": "a", "str_col_r": "b"}]    [{"str_col_l": "a string", "str_col_r": None}]*100

df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")

sql = """select 
    case
    when str_col_r is null or str_col_l is null  then -1
    else rel_length_py(str_col_l, str_col_r) 
    end
    as rel
from str_comp
"""
spark.sql(sql).toPandas().head(2)
CPU times: user 14.5 ms, sys: 6.42 ms, total: 20.9 ms
Wall time: 1.36 s

This proves that at least in some instances all parts of the case statement will execute. I don't believe that all parts are guaranteed to execute, because I've seen working examples that would error if all parts were being executed.

CodePudding user response:

It's funny that for some reason, spark executes the udf (at least for some rows) even if it does not use the result. Sometimes catalyst manages not to do it, like when the column is generated with lit(None) but most of the time it does.

The easiest way to solve this would be to modify your udf to handle that case:

def rel_lenght(str1, str2):
    if str1 is None or str2 is None:
        return -1
    else:
        len(str1) / len(str2)
  • Related