2016-11-09 62 views
0

我试图在Mapreduce中实现简单的组。下面Reducer类在Hadoop中没有按预期工作MapReduce

我输入文件中给出:

7369,SMITH,CLERK,800,20 
7499,ALLEN,SALESMAN,1600,30 
7521,WARD,SALESMAN,1250,30 
7566,JONES,MANAGER,2975,20 
7654,MARTIN,SALESMAN,1250,30 
7698,BLAKE,MANAGER,2850,30 
7782,CLARK,MANAGER,2450,10 
7788,SCOTT,ANALYST,3000,20 
7839,KING,PRESIDENT,5000,10 
7844,TURNER,SALESMAN,1500,30 
7876,ADAMS,CLERK,1100,20 
7900,JAMES,CLERK,950,30 
7902,FORD,ANALYST,3000,20 
7934,MILLER,CLERK,1300,10 

我的映射类:

public class Groupmapper extends Mapper<Object,Text,IntWritable,IntWritable> { 
    @Override 
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ 
     String line = value.toString(); 
     String[] parts=line.split(","); 
     String token1=parts[3]; 
     String token2=parts[4]; 
     int deptno=Integer.parseInt(token2); 
     int sal=Integer.parseInt(token1); 
     context.write(new IntWritable(deptno),new IntWritable(sal)); 
    }  
} 

减速机类:

public class Groupreducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { 
    IntWritable result=new IntWritable(); 
    public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ 
     int sum=0; 
     for(IntWritable val:values){ 
      sum+=val.get(); 
     } 
     result.set(sum); 
     context.write(key,result); 
    } 
} 

驱动程序类:

public class Group { 
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 
     Configuration conf=new Configuration(); 
     Job job=Job.getInstance(conf,"Group"); 
     job.setJarByClass(Group.class); 
     job.setMapperClass(Groupmapper.class); 
     job.setCombinerClass(Groupreducer.class); 
     job.setReducerClass(Groupreducer.class); 
     job.setOutputKeyClass(IntWritable.class); 
     job.setOutputValueClass(IntWritable.class); 
     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 
     System.exit(job.waitForCompletion(true) ? 0 : 1);   
    } 
} 

预期的输出应该是:

10  8750 
20  10875 
30  9400 

但它打印如下输出。它没有汇总这些值。 它像身份缩减器一样工作。

10  1300 
10  5000 
10  2450 
20  1100 
20  3000 
20  800 
20  2975 
20  3000 
30  1500 
30  1600 
30  2850 
30  1250 
30  1250 
30  950 

减速机功能不能正常工作。

回答

3

看起来好像没有使用reduce。因此仔细看看你的reducer将是调试的下一步。

如果你添加一个@Override到你的reduce方法(就像你在地图方法中做的那样),你会看到你得到一个Method does not override method from its superclass错误。这意味着hadoop不会使用您的reduce方法,并且将使用默认标识实现。

的问题是,你必须:

public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context)

,它应该是:

public void reduce(IntWritable key,Iterable<IntWritable> values, Context context)

唯一的区别是方法必须以小写r开始的名称。