Home > other >  Performing two count functions on an RDD in pyspark
Performing two count functions on an RDD in pyspark

Time:01-14

Given an RDD with tuples like

(key1, 0)
(key2, 5)
(key3, 11)
(key1, 44)
(key2, 0)
(key3, 43)
(key1, 0)
(key2, 5)
(key3, 33)

For each key, I wish to count 2 values, total count of values per key, or regular output of countByKey(), and second count, count of positive numbers by key.

So the result would be like:

[(key1, 3, 1),
(key2, 3, 2),
(key3, 3, 3)]

I would like to use only map and reduceByKey functions:

def map(value):
    return (value[0], (value[1], 1, 1))

return a key value pair, where value is the triple of the numerical value, and two integers used for counting

def reduce(val1, val2):
    # if value[0] is positive, increment first counter
    # in any case, always increment second counter

data.map(map).reduceByKey(reduce).collect()

would then be something like:

[(key1, 3, 1),
(key2, 3, 2),
(key3, 3, 3)]

CodePudding user response:

You can check if value is positive or not in the map step and assign 1 if > 0 else 0. Then, reduce by key and sum like this:

rdd1 = data.map(lambda x: (x[0], (1, int(x[1] > 0)))) \
    .reduceByKey(lambda a, b: (a[0]   b[0], a[1]   b[1]))

for k in rdd1.collect():
    print(k)

#('key1', (3, 1))
#('key2', (3, 2))
#('key3', (3, 3))

CodePudding user response:

If I understood correctly something like this:

def reduce(val1, val2):
    if val2[0] > 0: # val1 is the accumulator, val2 the new datum
        return (val1[0], val1[1]   1, val1[2]   1)
    else:
        return (val1[0], val1[1]   1, val1[2])

data.map(map).reduceByKey(reduce).collect()

would work.

  •  Tags:  
  • Related