2012-07-31 84 views
9

在很多MapReduce程序中,我也看到一个reducer被用作组合器。我知道这是因为这些计划的具体性质。但我想知道他们是否会有所不同。组合器和减速器可以不同?

回答

22

是的,组合器可以与Reducer不同,但Combiner仍然会实现Reducer接口。合并器只能用于需要依赖工作的特定情况。 Combiner将像Reducer一样运行,但只能在每个Mapper的键/值输出的子集上运行。

与Reducer不同,Combiner具有的一个约束是输入/输出键和值类型必须匹配Mapper的输出类型。

+1

能合成器被描述为本地减速? – user1170330 2015-05-28 01:09:50

+1

在这个链接真的有用的东西。它解释了何时应该使用组合器http://www.philippeadjiman.com/blog/2010/01/14/hadoop-tutorial-series-issue-4-to-use-or-not-to-use-a-combiner / – mk2 2016-04-26 21:26:43

7

是的,他们肯定会有所不同,但我不认为你想要使用不同的班级,大多数情况下你会得到意想不到的结果。

合并器只能用于可交换函数(a.b = b.a)和关联{a。(b.c)=(a.b).c}。这也意味着组合器只能在你的键和值的一个子集上运行,或者根本不能运行,你仍然希望程序的输出保持不变。

选择具有不同逻辑的不同类可能不会给你一个逻辑输出。

+0

这应该是公认的答案 – mk2 2016-04-26 21:29:39

0

组合器的主要目标是优化/最小化将在映射器和缩减器之间跨网络混洗的键值对的数量,从而尽可能节省最多的带宽。

组合器的经验法则是它必须有相同的输入和输出变量的类型,这样做的原因 ,是组合使用不能保证,它可以或不可以被使用,这取决于体积 和数量泄漏。

当减速机满足该规则,即相同的输入和输出变量类型时,可用作组合器。

组合器的另一个最重要的规则是它只能用于你想要应用的功能 是可交换和关联的。比如添加数字。但不是像平均值(如果您使用与缩减器相同的代码)。

现在回答你的问题,是的,当然他们可以是不同的,当你的减速机有不同类型的输入和输出变量,你别无选择,只能做一个不同的减速机代码副本并修改它。

如果你关心reducer的逻辑,你也可以以不同的方式实现,比如说在组合器的情况下,你可以让一个集合对象拥有一个本地缓冲区,其中包含所有值的合并器,这比在减速器中使用它的风险要小,因为在减速器的情况下,它比组合器更容易出现内存不足。其他的逻辑差异当然可以存在和确实。

2

这是实现,你可以运行没有组合器和组合器,都给出了完全相同的答案。这里Reducer和Combiner有不同的动机和不同的实现。

package combiner; 

import java.io.IOException; 


import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

public class Map extends Mapper<LongWritable, Text, Text, Average> { 

Text name = new Text(); 
String[] row; 

protected void map(LongWritable offSet, Text line, Context context) throws IOException, InterruptedException { 
    row = line.toString().split(" "); 
    System.out.println("Key "+row[0]+"Value "+row[1]); 
    name.set(row[0]); 
    context.write(name, new Average(Integer.parseInt(row[1].toString()), 1)); 
}} 

减少类

public class Reduce extends Reducer<Text, Average, Text, LongWritable> { 
    LongWritable avg =new LongWritable(); 
    protected void reduce(Text key, Iterable<Average> val, Context context)throws IOException, InterruptedException { 
    int total=0; int count=0; long avgg=0; 

    for (Average value : val){ 
     total+=value.number*value.count; 
     count+=value.count; 
     avgg=total/count; 
     } 
    avg.set(avgg); 
    context.write(key, avg); 
} 
} 

MapObject的类

public class Average implements Writable { 

long number; 
int count; 

public Average() {super();} 

public Average(long number, int count) { 
    this.number = number; 
    this.count = count; 
} 

public long getNumber() {return number;} 
public void setNumber(long number) {this.number = number;} 
public int getCount() {return count;} 
public void setCount(int count) {this.count = count;} 

@Override 
public void readFields(DataInput dataInput) throws IOException { 
    number = WritableUtils.readVLong(dataInput); 
    count = WritableUtils.readVInt(dataInput);  
} 

@Override 
public void write(DataOutput dataOutput) throws IOException { 
    WritableUtils.writeVLong(dataOutput, number); 
    WritableUtils.writeVInt(dataOutput, count); 

} 
} 

合类

public class Combine extends Reducer<Text, Average, Text, Average>{ 

protected void reduce(Text name, Iterable<Average> val, Context context)throws IOException, InterruptedException { 
    int total=0; int count=0; long avg=0; 

    for (Average value : val){ 
     total+=value.number; 
     count+=1; 
     avg=total/count;  
     } 
    context.write(name, new Average(avg, count)); 

} 
} 

驱动程序类

public class Driver1 { 

public static void main(String[] args) throws Exception { 

    Configuration conf = new Configuration(); 
    if (args.length != 2) { 
     System.err.println("Usage: SecondarySort <in> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "CustomCobiner"); 
    job.setJarByClass(Driver1.class); 
    job.setMapperClass(Map.class); 
    job.setCombinerClass(Combine.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Average.class); 
    job.setReducerClass(Reduce.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class);  
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

的Git从here

代码离开乌尔建议..