Home > Enterprise >  Duplicate "values" for some key in map-reduce java program
Duplicate "values" for some key in map-reduce java program


I am new in mapreduce and hadoop (hadoop 3.2.3 and java 8). I am trying to separate some lines based on a symbol in a line. Example: "q1,a,q0," should be return ('a',"q1,a,q0,") as (key, value). My dataset contains ten(10) lines , five(5) for key 'a' and five for key 'b'.

I expect to get 5 line for each key but i always get five for 'a' and 10 for 'b'



Mapper class:

import java.io.IOException;

import org.apache.hadoop.io.ByteWritable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, ByteWritable ,Text>{
    private  ByteWritable key1 = new ByteWritable();
    //private int n ;
    private int count =0 ;
    private Text wordObject = new Text();
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
        String ftext = value.toString();
        for (String line: ftext.split(";")) {
            wordObject = new Text();
            if (line.split(",")[2].equals("b")) {
                key1.set((byte) 'b');
                wordObject.set(line) ;
                continue ;
                key1.set((byte) 'a');
                wordObject.set(line) ;


Reducer class:

import java.io.IOException;

import org.apache.hadoop.io.ByteWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class MyReducer extends Reducer<ByteWritable, Text, ByteWritable ,Text>{
    private Integer count=0 ;


    public void reduce(ByteWritable key, Iterable<Text>  values, Context context) throws IOException, InterruptedException {
        for(Text val : values ) {
            count   ;
        Text symb = new Text(count.toString()) ;
        context.write(key , symb);


Driver class:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.printf("Usage: %s [generic options] <inputdir> <outputdir>\n", getClass().getSimpleName());
            return -1;
        Job job = new Job(getConf());
        job.setJobName("separation ");
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new Configuration(), new MyDriver(), args);

CodePudding user response:

The problem was solved by putting the variable "count" inside the function "Reduce()".

CodePudding user response:

Does your input read more than one line that has 5 more b's? I cannot reproduce for that one line, but your code can be cleaned up.

For the following code, I get output as

a 5
b 5
  static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, ByteWritable, Text> {

    final ByteWritable keyOut = new ByteWritable();
    final Text valueOut = new Text();

    protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, ByteWritable, Text>.Context context) throws IOException, InterruptedException {
      String line = value.toString();
      if (line.isEmpty()) {

      StringTokenizer tokenizer = new StringTokenizer(line, ";");
      while (tokenizer.hasMoreTokens()) {
        String token = tokenizer.nextToken();
        String[] parts = token.split(",");
        String keyStr = parts[2];
        if (keyStr.matches("[ab]")) {
          keyOut.set((byte) keyStr.charAt(0));
          context.write(keyOut, valueOut);

  static class Reducer extends org.apache.hadoop.mapreduce.Reducer<ByteWritable, Text, Text, LongWritable> {

    static final Text keyOut = new Text();
    static final LongWritable valueOut = new LongWritable();

    protected void reduce(ByteWritable key, Iterable<Text> values, org.apache.hadoop.mapreduce.Reducer<ByteWritable, Text, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      keyOut.set(new String(new byte[]{key.get()}, StandardCharsets.UTF_8));
      valueOut.set(StreamSupport.stream(values.spliterator(), true)
                       .mapToLong(v -> 1).sum());
      context.write(keyOut, valueOut);
  • Related