Home > Software design >  in aws emr job flow, does each step receive the output from the previous step?
in aws emr job flow, does each step receive the output from the previous step?

Time:01-10

I am making a map reduce program in Java that has 4 steps. each step is operating on the output of the previous step.

I ran those steps locally and manually so far, and i want to start running on AWS EMR using Job Flow.
my professor gave us some code to configure steps for a job flow, but now we face a problem:

each one of my steps is expecting input and output paths as the args of its main function. JobFlow allows me to transfer args to each step, but for my understanding each step in a job flow is supposed to receive the output of the previous step automatically

Does anyone know if this is true? how does the map-reduce app in the step realizes where its input is? is the path passed to it implicitly as arguments from the JobFlow?

I am using AWS SDK 2 for Java.

my code:

 public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
                // AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider
                // .create(ProfileCredentialsProvider.create().resolveCredentials());

                EmrClient mapReduce = EmrClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
                                .build();
                List<StepConfig> steps = new LinkedList<StepConfig>();

                HadoopJarStepConfig hadoopJarStepConfig = HadoopJarStepConfig.builder()
                                .jar("s3n://"   myBucketName   "/"   NCount   jarPostfix)
                                .mainClass(packageName   NCount)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(NCount).hadoopJarStep(hadoopJarStepConfig)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                HadoopJarStepConfig hadoopJarStepConfig2 = HadoopJarStepConfig.builder()
                                .jar("s3n://"   myBucketName   "/"   CountNrTr   jarPostfix)
                                .mainClass(packageName   CountNrTr)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(CountNrTr).hadoopJarStep(hadoopJarStepConfig2)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                HadoopJarStepConfig hadoopJarStepConfig3 = HadoopJarStepConfig.builder()
                                .jar("s3n://"   myBucketName   "/"   JoinAndCalculate   jarPostfix)
                                .mainClass(packageName   JoinAndCalculate)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(JoinAndCalculate).hadoopJarStep(hadoopJarStepConfig3)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                HadoopJarStepConfig hadoopJarStepConfig4 = HadoopJarStepConfig.builder()
                                .jar("s3n://"   myBucketName   "/"   ValueToKeySort   jarPostfix)
                                .mainClass(packageName   ValueToKeySort)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(ValueToKeySort).hadoopJarStep(hadoopJarStepConfig4)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                JobFlowInstancesConfig instances = JobFlowInstancesConfig.builder()
                                .instanceCount(2)
                                .masterInstanceType("m4.large")
                                .slaveInstanceType("m4.large")
                                .hadoopVersion("3.3.4")
                                .ec2KeyName(myKeyPair)
                                .keepJobFlowAliveWhenNoSteps(false)
                                .placement(PlacementType.builder().availabilityZone("us-east-1a").build()).build();

CodePudding user response:

EMR has nothing to do with problem. No, it's not automatic.

We would need to see the code of your executed JAR, but I only assume it's traditional mapreduce code where you're using FileInputFormat and might have code like Path(args[0]) if so, that's likely your input. Then Path(args[1]) could be the output.

Therefore, you'd simply chain those arguments together in each step...

step1 = ...
   .args(new String[] {"/in", "/stage1" })
...
final = ...
   .args(new String[] {"/stageN", "/out" }) 

Alternatively, translate your code to Spark/Flink or a Hive query, where multiple mapreduce stages are handled automatically

  • Related