I have a external source file news.txt
of type:
20030249,the old men
20040229,I like the way school and teachers work
20050249,another title goes here for any reason
20060269,text and strings are similar
20070551,cowbows love to ride horses
and a list words.txt
of words:
the
a
school
horses
The following code creates pairs RDDs in the form of
['2003', ['the', 'old', 'men'], ['2004', ['I', 'like', 'the',...
After the following code, I would like to add an RDD pair transformation code to remove the words words.txt
from the values list of the RDD "pair":
source = sc.textFile("news.txt")
stopwords = sc.textFile("words.txt")
pair = source.map(lambda s: [s[0:4],s[9::].split(' ')])
I have tried several in vain but I'm sure I'm close:
pair1 = pair.filter(lambda x: x not in stopwords)
pair1 = pair.map(lambda ws: for w in ws if w not in stopwords)
pair1 = pair.filter(lambda a: a != stopwords)
pair1 = pair.mapValues(lambda x: x not in stopwords)
CodePudding user response:
I assume that the list of stop words in pretty small. In that case, the easiest solution is to collect and then broadcast it. Then you can simply use that list of stop words apply a simple filter on the word lists in your RDD:
stopwords_local = set(stopwords.collect())
stopwords_bc = sc.broadcast(stopwords_local)
result = pair.mapValues(lambda ws : [w for w in ws if not w in stopwords_bc.value])
If the stopwords
rdd is large enough that you cannot or do not want to broadcast it, you could work something out by flattening the first dataframe, left joining with stopwords
, removing the lines for which there is no match and then group by back the data:
result = pair\
.flatMapValues(lambda x:x)\
.map(lambda x: (x[1], x[0]))\
.leftOuterJoin(stopwords.map(lambda x: (x, 1)))\
.filter(lambda x: x[1][1] is None)\
.map(lambda x: (x[1][0], x[0]))\
.groupByKey()\
.mapValues(lambda x: list(x))