I am new in MapRecude and trying to extend the word count program. I want to count in how many documents a word appears. Example: If i have 3 documents and the word "Try" apears 3 times in document 1 and 5 times in document 3. I want the final count to be 2.
I am not really sure how to do this, i have tried the writablecomparable class as a Key in my mapper, but i am getting errors when i try to replace the key with the class so i abandoned it. I am currently trying to have a Text variable for the Key, and give the value "word Document Name".
Here is what i have so far
CODE
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class wcount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Text fileName = new Text();
private String tokens = "[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"”“]";
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String cleanValue = value.toString().toLowerCase().replaceAll(tokens, " ");
String filePathString = ((FileSplit) reporter.getInputSplit()).getPath().getName().toString();
fileName.set(new Text(filePathString));
String line = cleanValue.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
Text k = new Text(word " " fileName);
output.collect(k, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String[] new_key = key.toString().split(" ");
Text word = new Text();
Text FileName = new Text();
word.set(new_key[0]);
//FileName.set(new_key[1]); //error here
int sum = 0;
while (values.hasNext()) {
sum = values.next().get();
}
output.collect(FileName, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(wcount.class);
conf.setJobName("wcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setNumReduceTasks(1);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
In my reducer trying to seperate the key in 2 strings but the "FileName.set(new_key[1]);" in giving me out of Bounds Exception.
I want to know if its possibol to do this with 1 run of MapReduce or i have to have a second. An example would be much appreciated.
CodePudding user response:
Validate your inputs
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String[] new_key = key.toString().split("\\s");
if (new_key.length >= 2) {
int sum = 0;
while (values.hasNext()) {
sum = values.next().get();
}
output.collect(new Text(new_key[1]), new IntWritable(sum));
}
} else {
System.out.printf("Unexpected data: \"%s\"%n", key);
}
}
You might also want to consider using LongWritable for large counts, or a Text output from a BigInteger value
CodePudding user response:
I am posting the code for anyone having the same problem.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class wcount {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
//private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Text fileName = new Text();
private String tokens = "[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"”“]";
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String cleanValue = value.toString().toLowerCase().replaceAll(tokens, " ");
String filePathString = ((FileSplit) context.getInputSplit()).getPath().getName().toString();
fileName.set(new Text(filePathString));
String line = cleanValue.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, fileName);
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String tempWord = "";
IntWritable tempvalue = new IntWritable(0);
int sum = 0;
for (Text value : values) {
if(!value.toString().trim().equals(tempWord)) {
sum = 1;
tempWord = value.toString().trim();
}
}
tempvalue.set(sum);
context.write(key, tempvalue);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Doccount");
job.setJarByClass(wcount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(1);
FileInputFormat .setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.out.println(success);
}
}