[('key1', ['word1', 'word1', 'word2', ...]),
('key2', ['word1', 'word2', 'word2', ...]),
...
]
I want to remove duplicate in the list and assign them to the key.
First step output: removing duplicate in list
[('key1', ['word1', 'word2', ...]),
('key2', ['word1', 'word2', ...]),
...
]
second step output
[('key1', ('word1', '1'))
('key1', ('word2', '1')),
...
('key2', ('word1', '1')),
('key2', ('word2', '1')),
...
]
Here's my try which does not work.
rdd.map(lambda x: (x[0], (x[1], 1])).collect()
Output:
[('key1', (['word1', 'word1', 'word2', ...], 1)),
('key2', (['word1', 'word2', 'word2', ...], 1)),
...
]
CodePudding user response:
As someone already indicated in the comments above, you cannot count the entries after you removed duplicates. Ideally, this should happen during the same operation.
Your question is not entirely clear. Hence, I am assuming that you want to transform the RDD
[('key1', ['word1', 'word2', 'word2']),
('key2', ['word1', 'word2', 'word2']),
('key3', ['word1', 'word2', 'word2']),
]
to
[('key1', {'word1': 1, 'word2': 2}),
('key2', {'word1': 1, 'word2': 2}),
('key3', {'word1': 1, 'word2': 2})
]
You could achieve this with the following code snippet:
from pyspark.sql import SparkSession
spark = (SparkSession
.Builder()
.getOrCreate()
)
sc = spark.sparkContext
from typing import List
mylist = [('key1', ['word1', 'word2', 'word2']),
('key2', ['word1', 'word2', 'word2']),
('key3', ['word1', 'word2', 'word2']),
]
myrdd = sc.parallelize(mylist)
rdd2 = myrdd.map(lambda x: (x[0], wordcounter(x[1])))
where the function wordcounter
could be something like
def wordcounter(words: List[str]) -> dict:
"""
Count every unique entry in the list
"""
out = {}
for ii in words:
if ii in out:
out[ii] = out[ii] 1
else:
out[ii] = 1
return out
CodePudding user response:
you can use a combination of map
and flatMap
.
here's the example
data_ls = [
('key1', ['word1', 'word1', 'word2']),
('key2', ['word1', 'word2', 'word2'])
]
spark.sparkContext.parallelize(data_ls). \
map(lambda r: (r[0], list(set(r[1])))). \
flatMap(lambda r: [(r[0], (k, 1)) for k in r[1]]). \
collect()
# [('key1', ('word1', 1)),
# ('key1', ('word2', 1)),
# ('key2', ('word1', 1)),
# ('key2', ('word2', 1))]