I have two different dataframes in Pyspark of String type. First dataframe is of single work while second is a string of words i.e., sentences. I have to check existence of first dataframe column from the second dataframe column. For example, df2
------ ------- -----------------
|age|height| name| Sentences |
--- ------ ------- -----------------
| 10| 80| Alice| 'Grace, Sarah'|
| 15| null| Bob| 'Sarah'|
| 12| null| Tom|'Amy, Sarah, Bob'|
| 13| null| Rachel| 'Tom, Bob'|
--- ------ ------- -----------------
Second dataframe df1
-------
| token |
-------
| 'Ali' |
|'Sarah'|
|'Bob' |
|'Bob' |
-------
So, how can I search for each token of df1 from df2 Sentence column. I need count for each word and add as a new column in df1
I have tried this solution, but work for a single word i.e., not for a complete column of dataframe
CodePudding user response:
You could use pyspark udf to create the new column in df1. Problem is you cannot access a second dataframe inside udf (view here).
As advised in the referenced question, you could get sentences as broadcastable varaible.
Here is a working example :
from pyspark.sql.types import *
from pyspark.sql.functions import udf
# Instanciate df2
cols = ["age", "height", "name", "Sentences"]
data = [
(10, 80, "Alice", "Grace, Sarah"),
(15, None, "Bob", "Sarah"),
(12, None, "Tom", "Amy, Sarah, Bob"),
(13, None, "Rachel", "Tom, Bob")
]
df2 = spark.createDataFrame(data).toDF(*cols)
# Instanciate df1
cols = ["token"]
data = [
("Ali",),
("Sarah",),
("Bob",),
("Bob",)
]
df1 = spark.createDataFrame(data).toDF(*cols)
# Creating broadcast variable for Sentences column of df2
lstSentences = [data[0] for data in df2.select('Sentences').collect()]
sentences = spark.sparkContext.broadcast(lstSentences)
def countWordInSentence(word):
# Count if sentence contains word
return sum(1 for item in lstSentences if word in item)
func_udf = udf(countWordInSentence, IntegerType())
df1 = df1.withColumn("COUNT",
func_udf(df1["token"]))
df1.show()
CodePudding user response:
Considering the dataframe in the prev answer
from pyspark.sql.functions import explode,explode_outer,split, length,trim
df3 = df2.select('Sentences',explode(split('Sentences',',')).alias('friends'))
df3 = df3.withColumn("friends", trim("friends")).withColumn("length_of_friends", length("friends"))
display(df3)
df3 = df3.join(df1, df1.token == df3.friends,how='inner').groupby('friends').count()
display(df3)