Spark version: 2.3.0
I have a PySpark dataframe that has an Array column, and I want to filter the array elements by applying some string matching conditions. Eg: If I had a dataframe like this
Array Col
['apple', 'banana', 'orange']
['strawberry', 'raspberry']
['apple', 'pineapple', 'grapes']
I would want to filter the elements within each array that contain the string 'apple' or, start with 'app' etc. How would I achieve this in PySpark?
Could someone tell me how I can implement it in pyspark?
CodePudding user response:
You can use higher order functions from spark 2.4 :
df.withColumn("Filtered_Col",F.expr(f"filter(Array_Col,x -> x rlike '^(?i)app' )")).show()
-------------------------- ------------
|Array_Col |Filtered_Col|
-------------------------- ------------
|[apple, banana, orange] |[apple] |
|[strawberry, raspberry] |[] |
|[apple, pineapple, grapes]|[apple] |
-------------------------- ------------
For lower versions, you are probably good to use an udf:
import re
def myf(v):
l=[]
for i in v:
if bool(re.match('^(?i)app',i)):
l.append(i)
return l
myudf = F.udf(myf,T.ArrayType(T.StringType()))
df.withColumn("Filtered_Col",myudf("Array_Col")).show()
CodePudding user response:
You can use filter in conjunction with exist which comes under Higher Order Functions that will check if any of the elements within the array contains the word
The other approach would be a UDF -
Data Preparation
sparkDF = sql.createDataFrame([(['apple', 'banana', 'orange'],),
(['strawberry', 'raspberry'],),
(['apple', 'pineapple', 'grapes'],)
]
,['arr_column']
)
sparkDF.show(truncate=False)
--------------------------
|arr_column |
--------------------------
|[apple, banana, orange] |
|[strawberry, raspberry] |
|[apple, pineapple, grapes]|
--------------------------
Filter & Exists >= Spark 2.4
starts_with_app = lambda s: s.startswith("app")
sparkDF_filtered = sparkDF.filter(F.exists(F.col("arr_column"), starts_with_app))
sparkDF_filtered.show(truncate=False)
--------------------------
|arr_column |
--------------------------
|[apple, banana, orange] |
|[apple, pineapple, grapes]|
--------------------------
UDF - Lower Versions as well
def filter_string(inp):
res = []
for s in inp:
if s.startswith("app"):
res = [s]
if res:
return res
else:
return None
filter_string_udf = F.udf(lambda x: filter_string(x),ArrayType(StringType()))
sparkDF_filtered = sparkDF.withColumn('arr_filtered',filter_string_udf(F.col('arr_column')))\
.filter(F.col('arr_filtered').isNotNull())
sparkDF_filtered.show(truncate=False)
-------------------------- ------------
|arr_column |arr_filtered|
-------------------------- ------------
|[apple, banana, orange] |[apple] |
|[apple, pineapple, grapes]|[apple] |
-------------------------- ------------