Home > other >  Filter Pyspark Dataframe column based on whether it contains or does not contain substring
Filter Pyspark Dataframe column based on whether it contains or does not contain substring

Time:12-05

I have a pyspark dataframe message_df with millions of rows that looks like this

id message
ab123 Hello my name is Chris
cd345 The room should be 2301
ef567 Welcome! What is your name?
gh873 That way please
kj893 The current year is 2022

and two lists

wanted_words = ['name','room']
unwanted_words = ['welcome','year']

I only want to get rows where message contains any of the words in wanted_words and does not contain any of the words in unwanted_words, hence the result should be:

id message
ab123 Hello my name is Chris
cd345 The room should be 2301

As of right now I am doing it word by word

message_df.select(lower(F.col('message'))).filter(
    (
        F.col('lower(message)').contains('name') |
        F.col('lower(message)').contains('room') 
    ) & (
        ~F.col('lower(message)').contains('welcome') &
        ~F.col('lower(message)').contains('year') 
    )  
)

Which is very tedious to code. However, when I instead use rlike:

wanted_words ="(name|room)"
unwanted_words ="(welcome|year)"

message_df.select(lower(F.col('message'))).filter(
   ~F.col('lower(message)').rlike(not_contain) &
    F.col('lower(message)').rlike(contain)
)

The process slows down immensely. Is the reason because rlike is significantly slower, and if so what is a better way of filtering when wanted_words and unwanted_words may contain hundreds of words?

CodePudding user response:

Split text into tokens/words and use arrays_overlap function to check if wanted or unwanted token is present:

df = df.filter(
    (
      F.arrays_overlap(
          F.split(F.regexp_replace(F.lower("message"), r"[^a-zA-Z0-9\s] ", ""), "\s "),
          F.array([F.lit(c) for c in wanted_words])
          )
    )
    & 
    (
      ~F.arrays_overlap(
          F.split(F.regexp_replace(F.lower("message"), r"[^a-zA-Z0-9\s] ", ""), "\s "),
          F.array([F.lit(c) for c in unwanted_words])
          )
    )
)

Full example:

columns = ["id","message"]
data = [["ab123","Hello my name is Chris"],["cd345","The room should be 2301"],["ef567","Welcome! What is your name?"],["gh873","That way please"],["kj893","The current year is 2022"]]
df = spark.createDataFrame(data).toDF(*columns)

wanted_words = ['name','room']
unwanted_words = ['welcome','year']

df = df.filter(
    (
      F.arrays_overlap(
          F.split(F.regexp_replace(F.lower("message"), r"[^a-zA-Z0-9\s] ", ""), "\s "),
          F.array([F.lit(c) for c in wanted_words])
          )
    )
    & 
    (
      ~F.arrays_overlap(
          F.split(F.regexp_replace(F.lower("message"), r"[^a-zA-Z0-9\s] ", ""), "\s "),
          F.array([F.lit(c) for c in unwanted_words])
          )
    )
)

[Out]:
 ----- ------------------------ 
|id   |message                 |
 ----- ------------------------ 
|ab123|Hello my name is Chris  |
|cd345|The room should be 2301 |
 ----- ------------------------ 

You can also pre-compute the tokens at once for efficiency:

df = df.withColumn("tokens", F.split(F.regexp_replace(F.lower("message"), r"[^a-zA-Z0-9\s] ", ""), "\s "))

and use in "arrays_overlap":

F.arrays_overlap(F.col("tokens"), F.array([F.lit(c) for c in wanted_words]))
  • Related