I have a .csv source file in the form of:
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,30.95,1,MATT,MORAL,CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1, MATT,MORAL, CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,89.95,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,54.50,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,2,TOM, SON,FLACQ
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD,PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,2,TAY,ANA,VACOAS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,35.00,3,TAY,ANA,VACOAS
I would like to calculate the average cost (price*qty/total qty) for each person using a combiner in MapReduce with the following result:
MATT MORAL 25.45
LELA SMI 72.225
TOM SON 19.95
DYDY ARD 20.36
TAY ANA 29.8
So I came up with the following code which is not working (giving me double the average). I do feel like I need to add an IF ELSE statement in the reducer to process the output of the combiner (unique keys) differently to the output of the mapper (duplicated keys):
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] ' ' words[-2]
price, qty = float(words[-5]), int(words[-4])
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0] * value[1])
totalqty = value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0] * value[1])
totalqty = value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()
Grateful if you could give me some guidance with the reducer!
CodePudding user response:
You shouldn't be weighting the totalprice
in the reducer as you have already done that in the combiner -
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0])
totalqty = value[1]
average = round(totalprice/totalqty,2)
yield key, average
Some more explanation
Here is what the Hadoop docs say about using a "Combiner" -
Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.
Introducing a "Combiner" into the mix works if your reduce operation can be broken down to multiple "mini reduces" without changing the end result.
If you want your combiner
and reducer
function to be the same - then you can need to make changes in the mapper
function -
Something like this -
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] ' ' words[-2]
price, qty = float(words[-5]), int(words[-4])
price = price * qty # This is the change
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0]) # And change here
totalqty = value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0]) # Change here
totalqty = value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()