Home > Back-end >  Pyspark - Find sub-string from a column of data-frame with another data-frame
Pyspark - Find sub-string from a column of data-frame with another data-frame

Time:04-23

I have two different dataframes in Pyspark of String type. First dataframe is of single work while second is a string of words i.e., sentences. I have to check existence of first dataframe column from the second dataframe column. For example, df2

     ------ ------- ----------------- 
    |age|height|   name|      Sentences  |
     --- ------ ------- ----------------- 
    | 10|    80|  Alice|   'Grace, Sarah'|
    | 15|  null|    Bob|          'Sarah'|
    | 12|  null|    Tom|'Amy, Sarah, Bob'|
    | 13|  null| Rachel|       'Tom, Bob'|
     --- ------ ------- ----------------- 

Second dataframe df1

 ------- 
| token |
 ------- 
| 'Ali' |
|'Sarah'|
|'Bob'  |
|'Bob'  |
 ------- 

So, how can I search for each token of df1 from df2 Sentence column. I need count for each word and add as a new column in df1

I have tried this solution, but work for a single word i.e., not for a complete column of dataframe

CodePudding user response:

You could use pyspark udf to create the new column in df1. Problem is you cannot access a second dataframe inside udf (view here).

As advised in the referenced question, you could get sentences as broadcastable varaible.

Here is a working example :

from pyspark.sql.types import *
from pyspark.sql.functions import udf

# Instanciate df2
cols = ["age", "height", "name", "Sentences"]
data = [
        (10, 80, "Alice", "Grace, Sarah"),
        (15, None, "Bob", "Sarah"),
        (12, None, "Tom", "Amy, Sarah, Bob"),
        (13, None, "Rachel", "Tom, Bob")
        ]

df2 = spark.createDataFrame(data).toDF(*cols)

# Instanciate df1
cols = ["token"]
data = [
        ("Ali",),
        ("Sarah",),
        ("Bob",),
        ("Bob",)
        ]

df1 = spark.createDataFrame(data).toDF(*cols)

# Creating broadcast variable for Sentences column of df2
lstSentences = [data[0] for data in df2.select('Sentences').collect()]
sentences = spark.sparkContext.broadcast(lstSentences)

def countWordInSentence(word):
    # Count if sentence contains word
    return sum(1 for item in lstSentences if word in item)

func_udf = udf(countWordInSentence, IntegerType())
df1 = df1.withColumn("COUNT",
                     func_udf(df1["token"]))
df1.show()

CodePudding user response:

Considering the dataframe in the prev answer

from pyspark.sql.functions import explode,explode_outer,split, length,trim
df3 = df2.select('Sentences',explode(split('Sentences',',')).alias('friends'))
df3 = df3.withColumn("friends", trim("friends")).withColumn("length_of_friends", length("friends")) 
display(df3)

df3 = df3.join(df1, df1.token == df3.friends,how='inner').groupby('friends').count()


display(df3)
  • Related