Home > OS >  How do I read CSV file with Distributed Cache and ToolRunner on the Command Line in Hadoop
How do I read CSV file with Distributed Cache and ToolRunner on the Command Line in Hadoop

Time:09-18

I am a beginner to Hadoop. I recently learnt that you can read files via the command line in Hadoop using ToolRunner and Distributed Cache. I wanted to know how to implement this to read in a CSV file. I am mainly confused at what the input directory should be as the -files method in the ToolRunner should add files on the fly right? Could anyone help ?

ToolRunner / Driver

package stubs;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvgWordLength extends Configured implements Tool {

    public static void main(String[] args) throws Exception {

        int exitCode = ToolRunner.run(new Configuration(), new AvgWordLength(), args);
        System.exit(exitCode);

    }

    @Override
    public int run(String[] args) throws Exception {
        /*
         * Validate that two arguments were passed from the command line.
         */
        if (args.length != 2) {
            System.out.printf("Usage: AvgWordLength <input dir> <output dir>\n");
            System.exit(-1);
        }

        /*
         * Instantiate a Job object for your job's configuration.
         */
        Job job = Job.getInstance(getConf());

        /*
         * Specify the jar file that contains your driver, mapper, and reducer.
         * Hadoop will transfer this jar file to nodes in your cluster running
         * mapper and reducer tasks.
         */
        job.setJarByClass(AvgWordLength.class);

        /*
         * Specify an easily-decipherable name for the job. This job name will
         * appear in reports and logs.
         */
        job.setJobName("Average Word Length");

        /*
         * Specify the paths to the input and output data based on the
         * command-line arguments.
         */
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        /*
         * Specify the mapper and reducer classes.
         */
        job.setMapperClass(LetterMapper.class);
        job.setReducerClass(AverageReducer.class);

        /*
         * The input file and output files are text files, so there is no need
         * to call the setInputFormatClass and setOutputFormatClass methods.
         */

        /*
         * The mapper's output keys and values have different data types than
         * the reducer's output keys and values. Therefore, you must call the
         * setMapOutputKeyClass and setMapOutputValueClass methods.
         */
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        /*
         * Specify the job's output key and value classes.
         */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        /*
         * Start the MapReduce job and wait for it to finish. If it finishes
         * successfully, return 0. If not, return 1.
         */
        boolean success = job.waitForCompletion(true);
        return (success ? 0 : 1);
    }
}

Mapper

package stubs;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;

/**
 * To define a map function for your MapReduce job, subclass the Mapper class
 * and override the map method. The class definition requires four parameters:
 * 
 * @param The
 *            data type of the input key - LongWritable
 * @param The
 *            data type of the input value - Text
 * @param The
 *            data type of the output key - Text
 * @param The
 *            data type of the output value - IntWritable
 */
public class LetterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {


    /**
     * The map method runs once for each line of text in the input file. The
     * method receives:
     * 
     * @param A
     *            key of type LongWritable
     * @param A
     *            value of type Text
     * @param A
     *            Context object.
     */
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        /*
         * Convert the line, which is received as a Text object, to a String
         * object.
         */
        String line = value.toString();

        /*
         * The line.split("\\W ") call uses regular expressions to split the
         * line up by non-word characters. If you are not familiar with the use
         * of regular expressions in Java code, search the web for
         * "Java Regex Tutorial."
         */
        if(line.contains("Colour,"){
               String[] word = line.split(",");
               String[] genre = word[9].split("\\|");
               for(int i = 0; i < (genre.length) -1 ; i  ){
                context.write(new Text(genre[i]), new IntWritable(word.length()));
            }
        }
    }


}

The Command I Used:

hadoop jar gc.jar stubs.AvgWordLength -files /home/training/training_materials/developer/data/movies_metadata.csv inputs gc_res

Inputs is an empty directory. If I don't put in the input directory the entire program hangs.

CodePudding user response:

The generic options (such as files) go before the command (jar) and command options (class and your two paths, given to args array).

hadoop -files /home/training/training_materials/developer/data/movies_metadata.csv jar gc.jar stubs.AvgWordLength inputs gc_res

But files just copies files into the cluster, not Distributedcache.

In my experience, the cache is mostly used as a place to share data across mapreduce stages, not a replacement for skipping hdfs put commands before running your code.

  • Related