Home > Software engineering >  Spark keeping words in column that match a list
Spark keeping words in column that match a list

Time:05-31

I currently have a list and a Spark dataframe:

['murder', 'violence', 'flashback', 'romantic', 'cult', 'revenge', 'psychedelic', 'comedy', 'suspenseful', 'good versus evil']

Spark data frame

I am having a tough time figuring out a way to create a new column in the dataframe that takes the first matching word from the tags column for each row and puts it in the newly created column for that row.

For example, lets say the first row in the tags column has only "murder" in it, I would want that to show in the new column. Then, if the next row had "boring", "silly" and "cult" in it, I would want it to show cult in the new column since it matches the list. If the last row in tags column had "revenge", "cult" in it, I would want it to only show revenge, since its the first word that matches the list.

CodePudding user response:

from pyspark.sql import functions as F
df = spark.createDataFrame([('murder',), ('boring silly cult',), ('revenge cult',)], ['tags'])

mylist = ['murder', 'violence', 'flashback', 'romantic', 'cult', 'revenge', 'psychedelic', 'comedy', 'suspenseful', 'good versus evil']
pattern = '|'.join([f'({x})' for x in mylist])

df = df.withColumn('first_from_list', F.regexp_extract('tags', pattern, 0))

df.show()
#  ----------------- --------------- 
# |             tags|first_from_list|
#  ----------------- --------------- 
# |           murder|         murder|
# |boring silly cult|           cult|
# |     revenge cult|        revenge|
#  ----------------- --------------- 

CodePudding user response:

You could use a PySpark UDF (User Defined Function).

First, let's write a python function to find a first match between a list (in this case the list you provided) and a string, that is, the value of the tags column:

def find_first_match(tags):
  first_match = ''
  genres= ['murder', 'violence', 'flashback', 'romantic', 'cult', 'revenge', 'psychedelic', 'comedy', 'suspenseful', 'good versus evil']
  for tag in tags.split():
    for genre in genres:
      if tag==genre:
        first_match=tag
  return first_match

Then, we need to convert this function into a PySpark udf so that we can use it in combination with the .withColumn() operation:

find_first_matchUDF = udf(lambda z:find_first_match(z))   

Now we can apply the udf function to generate a new column. Assuming df is the name of your DataFrame:

from pyspark.sql.functions import col

new_df = df.withColumn("first_match", find_first_matchUDF(col("tags")))

This approach only works if all tag in your tags column are separated by white spaces.

P.S

You can avoid the second step using annotation:

from pyspark.sql.functions import col

@udf(returnType=StringType()) 
def find_first_match(tags):
  first_match = ''
  genres= ['murder', 'violence', 'flashback', 'romantic', 'cult', 'revenge', 'psychedelic', 'comedy', 'suspenseful', 'good versus evil']
  for tag in tags.split():
    for genre in genres:
      if tag==genre:
        first_match=tag
  return first_match

new_df = df.withColumn("first_match", find_first_match(col("tags")))

  • Related