I wrote a mapreduce program to resolve matrix operation "X-MN" where M,N,X are matrices with integer values. In order to do that I need to have a list of lists. For instance:
M=[[1,2,3],[4,5,6],[7,8,9]] which represents a matrix of 3x3.
During some tshoot I found the following:
test=[1,2]
test2=[4,5]
test.append(test2) #this works
a=test[0]
b=test[1]
c=test[2] #this also works
print(c)
output => [4,5]
So, until this point everyting seems to be fine.
However, from my actual mapper.py program code below.
#!/usr/bin/env python3
import sys
row_m, col_m = map(int, sys.argv[1:])
row_n = col_m
col_n = row_m
all_matrices=list()
for line in sys.stdin:
all_matrices.append(line)
M_values=list()
N_values=list()
X_values = list()
for line in all_matrices:
elements=line.rstrip().split(", ")
index = elements[0]
row_index = elements[1]
row_values = elements[2:]
if index == "1":
M_values.append(row_values)
elif index == "2":
N_values.append(row_values)
elif index == "3":
X_values.append(row_values)
print(N_values) #this works
print(N_values[0]) #this exact command does not work and causes below errors
# and I need it in order to my logic works
#without "N_values[0]" command, above code works just fine.
Basically, until this point I have only read from input mapper data on stdin, store it on 3 different lists, and when I try to recover a matrix row (an element list) it fails.
Here is my run.sh file that executes my mapper python program.
#!/bin/bash
arg1=2
arg2=2
hadoop jar ~/hadoop-streaming-3.1.4.jar \
-D mapred.reduce.tasks=0 \
-file ./mapper.py \
-mapper "./mapper.py $arg1 $arg2" \
-input /matrices \
-output /output
matrices folder contains 3 txt files for each matrix. This is my input mapper data and I am sure I can gather it as whithout the problematic command I am able to see the data on "N_values"
Here are the errors:
22/09/15 18:14:13 INFO mapreduce.Job: Running job: job_1663260914490_0011
22/09/15 18:14:21 INFO mapreduce.Job: Job job_1663260914490_0011 running in uber mode : false
22/09/15 18:14:21 INFO mapreduce.Job: map 0% reduce 0%
22/09/15 18:14:32 INFO mapreduce.Job: Task Id : attempt_1663260914490_0011_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:461)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
.
.
.
22/09/15 18:15:14 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
Additionally, I execute my program on hadoop server to assure that it is not a syntaxis issue and it actually works. Test as follow: Matrix N_values was [[7,9],[4,2]]
[xxxx]$ cat matrix.txt | python mapper.py 2 2
[['7', '9'], ['4', '2']]
[4, 5]
['7', '9']
Then I do not think this is a problem with my python code. Maybe it is an issue with the commands on run.sh file, as I am a newbie working with hadoop, or the .jar version I am using.
I hope someone can help me to resolve this problem. It would be much appreciated.
CodePudding user response:
Well, I found that the main problem here wasn´t the code syntax. It was more like a code style issue. It is my understanding that hadoop with map-reduce approach expects to work witht input information, mapper output, and reducer input info in a distributed way as it could have more than one mapper and reducer. Therefore, logs where misleading and when I corrected my code to always have the necessary information to be available at the moment when it needs to be processed it actually works as expected. This is something that I was not used to do it when just coding some shell scripts with python and made the difference. Hopefully, this can help others to avoid wasting time thinking why hadoop does not like my code when it runs locally LoL