Home > Enterprise >  How can I remove duplicate and count values for list in RDD?
How can I remove duplicate and count values for list in RDD?

Time:10-30

[('key1', ['word1', 'word1', 'word2', ...]),
 ('key2', ['word1', 'word2', 'word2', ...]),
 ...
]

I want to remove duplicate in the list and assign them to the key.

First step output: removing duplicate in list

[('key1', ['word1', 'word2', ...]),
 ('key2', ['word1', 'word2', ...]),
 ...
]

second step output

[('key1', ('word1', '1'))
 ('key1', ('word2', '1')),
 ...
 ('key2', ('word1', '1')),
 ('key2', ('word2', '1')),
 ...
]

Here's my try which does not work.

rdd.map(lambda x: (x[0], (x[1], 1])).collect()

Output:

[('key1', (['word1', 'word1', 'word2', ...], 1)),
 ('key2', (['word1', 'word2', 'word2', ...], 1)),
 ...
]

CodePudding user response:

As someone already indicated in the comments above, you cannot count the entries after you removed duplicates. Ideally, this should happen during the same operation.

Your question is not entirely clear. Hence, I am assuming that you want to transform the RDD

[('key1', ['word1', 'word2', 'word2']),
 ('key2', ['word1', 'word2', 'word2']),
 ('key3', ['word1', 'word2', 'word2']),
]

to

[('key1', {'word1': 1, 'word2': 2}), 
 ('key2', {'word1': 1, 'word2': 2}), 
 ('key3', {'word1': 1, 'word2': 2})
]

You could achieve this with the following code snippet:

from pyspark.sql import SparkSession
spark = (SparkSession
        .Builder()
        .getOrCreate()
        )
sc = spark.sparkContext
from typing import List

mylist = [('key1', ['word1', 'word2', 'word2']),
          ('key2', ['word1', 'word2', 'word2']),
          ('key3', ['word1', 'word2', 'word2']),
         ]
myrdd = sc.parallelize(mylist)
rdd2 = myrdd.map(lambda x: (x[0], wordcounter(x[1])))

where the function wordcounter could be something like

def wordcounter(words: List[str]) -> dict:
    """
    Count every unique entry in the list
    """
    out = {}
    for ii in words:
        if ii in out:
            out[ii] = out[ii]   1
        else:
            out[ii] = 1
    return out

CodePudding user response:

you can use a combination of map and flatMap.

here's the example

data_ls = [
    ('key1', ['word1', 'word1', 'word2']),
    ('key2', ['word1', 'word2', 'word2'])
]

spark.sparkContext.parallelize(data_ls). \
    map(lambda r: (r[0], list(set(r[1])))). \
    flatMap(lambda r: [(r[0], (k, 1)) for k in r[1]]). \
    collect()

# [('key1', ('word1', 1)),
#  ('key1', ('word2', 1)),
#  ('key2', ('word1', 1)),
#  ('key2', ('word2', 1))]
  • Related