MapReduce - Count the document number a word apears


I am new in MapRecude and trying to extend the word count program. I want to count in how many documents a word appears. Example: If i have 3 documents and the word "Try" apears 3 times in document 1 and 5 times in document 3. I want the final count to be 2.

I am not really sure how to do this, i have tried the writablecomparable class as a Key in my mapper, but i am getting errors when i try to replace the key with the class so i abandoned it. I am currently trying to have a Text variable for the Key, and give the value "word Document Name".

Here is what i have so far


    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.*;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.*;
    public class wcount {
        public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            private Text fileName = new Text();
            private String tokens = "[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"”“]";
            public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
                String cleanValue = value.toString().toLowerCase().replaceAll(tokens, " ");
                String filePathString = ((FileSplit) reporter.getInputSplit()).getPath().getName().toString();
                fileName.set(new Text(filePathString));
                String line = cleanValue.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) {
                    Text k = new Text(word   " "   fileName);
                    output.collect(k, one);
        public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {         
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
                String[] new_key = key.toString().split(" ");
                Text word = new Text();
                Text FileName = new Text();
                //FileName.set(new_key[1]); //error here
                int sum = 0;
                while (values.hasNext()) {
                    sum  = values.next().get();
                output.collect(FileName, new IntWritable(sum));
        public static void main(String[] args) throws Exception {
            JobConf conf = new JobConf(wcount.class);
            FileInputFormat.setInputPaths(conf, new Path(args[0]));
            FileOutputFormat.setOutputPath(conf, new Path(args[1]));

In my reducer trying to seperate the key in 2 strings but the "FileName.set(new_key[1]);" in giving me out of Bounds Exception.

I want to know if its possibol to do this with 1 run of MapReduce or i have to have a second. An example would be much appreciated.

Validate your inputs

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {         
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String[] new_key = key.toString().split("\\s");
        if (new_key.length >= 2) {
            int sum = 0;
            while (values.hasNext()) {
                sum  = values.next().get();
            output.collect(new Text(new_key[1]), new IntWritable(sum));
     } else {
         System.out.printf("Unexpected data: \"%s\"%n", key);

You might also want to consider using LongWritable for large counts, or a Text output from a BigInteger value

I am posting the code for anyone having the same problem.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class wcount {
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        //private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private Text fileName = new Text();
        private String tokens = "[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"”“]";
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String cleanValue = value.toString().toLowerCase().replaceAll(tokens, " ");
            String filePathString = ((FileSplit) context.getInputSplit()).getPath().getName().toString();
            fileName.set(new Text(filePathString));
            String line = cleanValue.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                context.write(word, fileName);

    public static class Reduce extends Reducer<Text, Text, Text, IntWritable> {
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {          
            String tempWord = "";
            IntWritable tempvalue = new IntWritable(0);
            int sum = 0;
            for (Text value : values) {
                if(!value.toString().trim().equals(tempWord)) {
                    sum  = 1;
                    tempWord = value.toString().trim();
            context.write(key, tempvalue);

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Doccount");

        FileInputFormat .setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean success = job.waitForCompletion(true);
