Given an RDD with tuples like
(key1, 0)
(key2, 5)
(key3, 11)
(key1, 44)
(key2, 0)
(key3, 43)
(key1, 0)
(key2, 5)
(key3, 33)
For each key
, I wish to count 2 values, total count of values per key, or regular output of countByKey()
, and second count, count of positive numbers by key.
So the result would be like:
[(key1, 3, 1),
(key2, 3, 2),
(key3, 3, 3)]
I would like to use only map
and reduceByKey
functions:
def map(value):
return (value[0], (value[1], 1, 1))
return a key value pair, where value is the triple of the numerical value, and two integers used for counting
def reduce(val1, val2):
# if value[0] is positive, increment first counter
# in any case, always increment second counter
data.map(map).reduceByKey(reduce).collect()
would then be something like:
[(key1, 3, 1),
(key2, 3, 2),
(key3, 3, 3)]
CodePudding user response:
You can check if value is positive or not in the map
step and assign 1
if > 0
else 0
. Then, reduce by key and sum like this:
rdd1 = data.map(lambda x: (x[0], (1, int(x[1] > 0)))) \
.reduceByKey(lambda a, b: (a[0] b[0], a[1] b[1]))
for k in rdd1.collect():
print(k)
#('key1', (3, 1))
#('key2', (3, 2))
#('key3', (3, 3))
CodePudding user response:
If I understood correctly something like this:
def reduce(val1, val2):
if val2[0] > 0: # val1 is the accumulator, val2 the new datum
return (val1[0], val1[1] 1, val1[2] 1)
else:
return (val1[0], val1[1] 1, val1[2])
data.map(map).reduceByKey(reduce).collect()
would work.