I am a beginner to Hadoop. I recently learnt that you can read files via the command line in Hadoop using ToolRunner and Distributed Cache. I wanted to know how to implement this to read in a CSV file. I am mainly confused at what the input directory should be as the -files method in the ToolRunner should add files on the fly right? Could anyone help ?
ToolRunner / Driver
package stubs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvgWordLength extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new AvgWordLength(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
/*
* Validate that two arguments were passed from the command line.
*/
if (args.length != 2) {
System.out.printf("Usage: AvgWordLength <input dir> <output dir>\n");
System.exit(-1);
}
/*
* Instantiate a Job object for your job's configuration.
*/
Job job = Job.getInstance(getConf());
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(AvgWordLength.class);
/*
* Specify an easily-decipherable name for the job. This job name will
* appear in reports and logs.
*/
job.setJobName("Average Word Length");
/*
* Specify the paths to the input and output data based on the
* command-line arguments.
*/
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*
* Specify the mapper and reducer classes.
*/
job.setMapperClass(LetterMapper.class);
job.setReducerClass(AverageReducer.class);
/*
* The input file and output files are text files, so there is no need
* to call the setInputFormatClass and setOutputFormatClass methods.
*/
/*
* The mapper's output keys and values have different data types than
* the reducer's output keys and values. Therefore, you must call the
* setMapOutputKeyClass and setMapOutputValueClass methods.
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
/*
* Start the MapReduce job and wait for it to finish. If it finishes
* successfully, return 0. If not, return 1.
*/
boolean success = job.waitForCompletion(true);
return (success ? 0 : 1);
}
}
Mapper
package stubs;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
/**
* To define a map function for your MapReduce job, subclass the Mapper class
* and override the map method. The class definition requires four parameters:
*
* @param The
* data type of the input key - LongWritable
* @param The
* data type of the input value - Text
* @param The
* data type of the output key - Text
* @param The
* data type of the output value - IntWritable
*/
public class LetterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* The map method runs once for each line of text in the input file. The
* method receives:
*
* @param A
* key of type LongWritable
* @param A
* value of type Text
* @param A
* Context object.
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*
* Convert the line, which is received as a Text object, to a String
* object.
*/
String line = value.toString();
/*
* The line.split("\\W ") call uses regular expressions to split the
* line up by non-word characters. If you are not familiar with the use
* of regular expressions in Java code, search the web for
* "Java Regex Tutorial."
*/
if(line.contains("Colour,"){
String[] word = line.split(",");
String[] genre = word[9].split("\\|");
for(int i = 0; i < (genre.length) -1 ; i ){
context.write(new Text(genre[i]), new IntWritable(word.length()));
}
}
}
}
The Command I Used:
hadoop jar gc.jar stubs.AvgWordLength -files /home/training/training_materials/developer/data/movies_metadata.csv inputs gc_res
Inputs is an empty directory. If I don't put in the input directory the entire program hangs.
CodePudding user response:
The generic options (such as files) go before the command (jar) and command options (class and your two paths, given to args array).
hadoop -files /home/training/training_materials/developer/data/movies_metadata.csv jar gc.jar stubs.AvgWordLength inputs gc_res
But files just copies files into the cluster, not Distributedcache.
In my experience, the cache is mostly used as a place to share data across mapreduce stages, not a replacement for skipping hdfs put
commands before running your code.