2016-03-15 48 views
2

我试图在矩阵的map reduce-transpose中实现一个简单的问题。 输入:在地图中排序reduce

1 2 3 
    4 5 6 
    7 8 9 

期望输出 -

1 4 7 
    2 5 8 
    7 8 9 

我的地图输出

(0,1) (1,4), (2,7), (0,2) (1,5), (2,8) 

等。

我期待使用减速器方法方法0-{1,2,7} , 1-{4,5,8}并直接使用写入到写在序列化形式的对象。但洗牌和排序不能提供所需的输出。地图方法后,我得到的输出为0-{1,7,2} , 1-{5,4,8}

如何SS在这种情况下工作,如果我的关键是常见的。还有什么解决这个案件。当他们进入reduce阶段

回答

1

钥匙将进行排序,在给定的值集值不会进行排序。
传递给reducer的值不能保证顺序,这不是Hadoop的工作方式。

你的问题是(如你所说)一个'简单的问题'[在许多其他不同的框架和范例]。这个问题是不是一个简单的问题(或适当的)地图减少的问题。


一个解决你的情况有更多复杂的密钥,以确保输出是你要开始的顺序,或通过辅助排序地图通过输出减少工作从密钥生成组合键和个人价值。

+3

http://codingjunkie.net/secondary-sort/二级排序让您订购值。 –

+0

@ BenWatson你是对的!当我向它透露(现在明确提到它)时,原来的问题表明我在提供任何其他类型的答案之前想要解决的一些混乱问题。 – pandorym

0

您可以构造一个也包含列索引的值。

public class ColumnValue implements Writable{ 


    public double column; 
    public double value; 

    public PartialWritablePhase1(long column, double value){ 
     this.column = column; 
     this.value = value;  
    } 


    @Override 
    public void readFields(DataInput in) throws IOException { 
     this.column = in.readLong(); 
     this.value = in.readDouble(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeLong(column); 
     out.writeDouble(value); 


    } 

    @Override 
    public String toString() { 
      return column+" "+value; 
} 


} 

然后,您可以在减速用它作为这样

public void reduce(LongWritable key, Iterable<ColumnValue> values, Context context) 
      throws IOException, InterruptedException { 

     for (ColumnVal val : values) { 
      //Store values of column in OrderedByColumn an ordered tree set by column 
      // or any structure you want 
     } 


     Iterator<ColumnValue> keySetIterator = OrderedByColumn.iterator(); 

     while(keySetIterator.hasNext()){ 

      context.write(new LongWritable(key.get()), keySetIterator.next()); 
     } 


    } 
0

在Reducer中强制排序值的唯一方法是创建一个自定义组合键并实现您自己的组比较器。这将实现你想要的。

public class CompositeKey implements WritableComparable<CompositeKey> { 
    private int id; 
    private int order; 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     id = in.readInt(); 
     order = in.readInt(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeInt(id); 
     out.writeInt(order); 
    } 

    @Override 
    public int hashCode() { 
     return id; 
    } 

    @Override 
    public int compareTo(CompositeKey other) { 
     if(this.id != other.id) { 
      return this.id - other.id; 
     } 
     return this.order - other.order; 
    } 

    public int getId() { 
     return id; 
    } 

    public void setId(int id) { 
     this.id = id; 
    } 

    public int getOrder() { 
     return order; 
    } 

    public void setOrder(int order) { 
     this.order = order; 
    } 
} 

id字段对应您的Mapper输出的关键字。订单字段对应于您希望您的值在Reducer中为每个键显示的顺序。 例如,第一个元素和第二个元素(键:{id = 0,order = 1},值:2)现在您的映射器应该输出(键:{id = 0,order = 0},value:1) 。这样你就可以订购你的元素。

最后,为了让您的键值对仅由键ID分组,您需要编写自己的组比较器。

public class CompKeyGroupComparator extends WritableComparator { 
    public CompKeyGroupComparator() { 
     super(CompositeKey.class, true); 
    } 

    @Override 
    public int compare(WritableComparable a, WritableComparable b) { 
     CompositeKey lKey = (CompositeKey) a; 
     CompositeKey rKey = (CompositeKey) b; 
     return lKey.getId() - rKey.getId(); 
    } 
} 

要设置GroupComparator:

job.setGroupingComparatorClass(CompKeyGroupComparator.class); 

现在你的减速将获得与你的映射器下达了命令值。 0- {1,2,7},1- {4,5,8}等