Home > Net >  Pyspark -- Filter ArrayType rows which contain null value
Pyspark -- Filter ArrayType rows which contain null value

Time:11-12

I am a beginner of PySpark. Suppose I have a Spark dataframe like this:

test_df = spark.createDataFrame(pd.DataFrame({"a":[[1,2,3], [None,2,3], [None, None, None]]}))

Now I hope to filter rows that the array DO NOT contain None value (in my case just keep the first row).

I have tried to use:

test_df.filter(array_contains(test_df.a, None))

But it does not work and throws an error:

AnalysisException: "cannot resolve 'array_contains(a, NULL)' due to data type mismatch: Null typed values cannot be used as arguments;;\n'Filter array_contains(a#166, null)\n - LogicalRDD [a#166], false\n

How should I filter in the correct way? Many thanks!

CodePudding user response:

You need to use the forall function.

df = test_df.filter(F.expr('forall(a, x -> x is not null)'))
df.show(truncate=False)

CodePudding user response:

You can use aggregate higher order function to count the number of nulls and filter rows with the count = 0. This will enable you to drop all rows with at least 1 None within the array.

data_ls = [
    (1, ["A", "B"]),
    (2, [None, "D"]),
    (3, [None, None])
]

data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['a', 'b'])

data_sdf.show()

 --- ------ 
|  a|     b|
 --- ------ 
|  1|[A, B]|
|  2| [, D]|
|  3|   [,]|
 --- ------ 

# count the number of nulls within an array
data_sdf. \
    withColumn('c', func.expr('aggregate(b, 0, (x, y) -> x   int(y is null))')). \
    show()

 --- ------ --- 
|  a|     b|  c|
 --- ------ --- 
|  1|[A, B]|  0|
|  2| [, D]|  1|
|  3|   [,]|  2|
 --- ------ --- 

Once you have the column created you can apply the filter as filter(func.col('c')==0).

CodePudding user response:

You can use exists function:

test_df.filter("!exists(a, x -> x is null)").show() 

# --------- 
#|        a|
# --------- 
#|[1, 2, 3]|
# --------- 
  • Related