I am a bit of a novice with pyspark and I could use some guidance. So I'm working with some text data and ultimately I want to get rid of words that either don't appear often enough in the entire corpus, or appear too often.
The data looks something like this with each row containing a sentence:
--------------------
| cleaned|
--------------------
|China halfway com...|
|MCI overhaul netw...|
|script kiddy join...|
|look Microsoft Mo...|
|Americans appear ...|
|Oil Eases Venezue...|
|Americans lose be...|
|explosion Echo Na...|
|Bush tackle refor...|
|jail olympic pool...|
|coyote sign RW Jo...|
|home pc key Windo...|
|bomb defuse Blair...|
|Livermore need ...|
|hat ring fast Wi ...|
|Americans dutch s...|
|Insect Vibrations...|
|Britain sleepwalk...|
|Ron Regan Jr Kind...|
|IBM buy danish fi...|
--------------------
So essentially I split the strings using split()
from pyspark.sql.functions
, and then count the occurrence of each words, come up with some criteria and create a list of words that need to be deleted.
I then use the following functions
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def remove_stop_words(list_of_tokens, list_of_stopwords):
'''
A very simple fuction that takes in a list of word tokens and then gets rid of words that are in stopwords list
'''
return [token for token in list_of_tokens if token not in list_of_stopwords]
def udf_remove_stop_words(list_of_stopwords):
'''
creates a udf that takes in a list of stop words and passes them onto remove_stop_words
'''
return udf(lambda x: remove_stop_words(x, list_of_stopwords))
wordsNoStopDF = splitworddf.withColumn('removed', udf_remove_stop_words(list_of_words_to_get_rid)(col('split')))
where list_of_words_to_get_rid
is a list of words I'm trying to get rid of and the input to this pipeline looks as follows
--------------------
| split|
--------------------
|[China, halfway, ...|
|[MCI, overhaul, n...|
|[script, kiddy, j...|
|[look, Microsoft,...|
|[Americans, appea...|
|[Oil, Eases, Vene...|
|[Americans, lose,...|
|[explosion, Echo,...|
|[Bush, tackle, re...|
|[jail, olympic, p...|
--------------------
only showing top 10 rows
and the output looks like the following with the corresponding schema
-------------------- --------------------
| split| removed|
-------------------- --------------------
|[China, halfway, ...|[China, halfway, ...|
|[MCI, overhaul, n...|[MCI, overhaul, n...|
|[script, kiddy, j...|[script, join, fo...|
|[look, Microsoft,...|[look, Microsoft,...|
|[Americans, appea...|[Americans, appea...|
|[Oil, Eases, Vene...|[Oil, Eases, Vene...|
|[Americans, lose,...|[Americans, lose,...|
|[explosion, Echo,...|[explosion, Echo,...|
|[Bush, tackle, re...|[Bush, tackle, re...|
|[jail, olympic, p...|[jail, olympic, p...|
|[coyote, sign, RW...|[coyote, sign, Jo...|
|[home, pc, key, W...|[home, pc, key, W...|
|[bomb, defuse, Bl...|[bomb, defuse, Bl...|
|[Livermore, , , n...|[Livermore, , , n...|
|[hat, ring, fast,...|[hat, ring, fast,...|
|[Americans, dutch...|[Americans, dutch...|
|[Insect, Vibratio...|[tell, Good, Time...|
|[Britain, sleepwa...|[Britain, big, br...|
|[Ron, Regan, Jr, ...|[Ron, Jr, Guy, , ...|
|[IBM, buy, danish...|[IBM, buy, danish...|
-------------------- --------------------
root
|-- split: array (nullable = true)
| |-- element: string (containsNull = true)
|-- removed: string (nullable = true)
So my question is how do I turn the column removed
into an array like split
? I'm hoping to use explode
to count word occurrence, but I can't seem to quite figure out what to do. I've tried to use regex_replace
to get rid of the brackets, and then split the string with ,
as pattern to split on, but that seem to only add a bracket to the column remove
.
Is there some change I can make to the functions I'm using to have them return an array of string like the column split
.
Any guidance here would be greatly appreciated!
CodePudding user response:
You haven't define a return type for your UDF, which is StringType
by default, that's why you got removed
column is is a string. You can add use return type like so
from pyspark.sql import types as T
udf(lambda x: remove_stop_words(x, list_of_stopwords), T.ArrayType(T.StringType()))
CodePudding user response:
You can change the return type of your UDF. However, I'd suggest NOT to use any udf to remove list of word list_of_words_to_get_rid
from the column splited
of type array, as you can simply use the spark built-in function array_except
.
Here's an example:
import pyspark.sql.functions as F
df = spark.createDataFrame([("a simple sentence containing some words",)], ["cleaned"])
list_of_words_to_get_rid = ["some", "a"]
wordsNoStopDF = df.withColumn(
"split",
F.split("cleaned", " ")
).withColumn(
"removed",
F.array_except(
F.col("split"),
F.array(*[F.lit(w) for w in list_of_words_to_get_rid])
)
).drop("cleaned")
wordsNoStopDF.show(truncate=False)
# ---------------------------------------------- -------------------------------------
#|split |removed |
# ---------------------------------------------- -------------------------------------
#|[a, simple, sentence, containing, some, words]|[simple, sentence, containing, words]|
# ---------------------------------------------- -------------------------------------