2012-07-12 105 views
0

我有这个hadoop map reduce代码适用于图形数据(以邻接列表形式)和类似于在邻接列表到外部邻接列表转换算法的类型。主MapReduce的任务的代码如下:为什么Hadoop shuffle无法按预期方式工作

public class TestTask extends Configured 
implements Tool { 

public static class TTMapper extends MapReduceBase 
    implements Mapper<Text, TextArrayWritable, Text, NeighborWritable> { 

    @Override 
    public void map(Text key, 
      TextArrayWritable value, 
      OutputCollector<Text, NeighborWritable> output, 
      Reporter reporter) throws IOException { 

     int numNeighbors = value.get().length; 
     double weight = (double)1/numNeighbors; 

     Text[] neighbors = (Text[]) value.toArray(); 

     NeighborWritable me = new NeighborWritable(key, new DoubleWritable(weight)); 

     for (int i = 0; i < neighbors.length; i++) { 
      output.collect(neighbors[i], me); 
     } 
    }  
} 

public static class TTReducer extends MapReduceBase 
    implements Reducer<Text, NeighborWritable, Text, Text> { 

    @Override 
    public void reduce(Text key, 
         Iterator<NeighborWritable> values, 
         OutputCollector<Text, Text> output, 
         Reporter arg3) 
      throws IOException { 

     ArrayList<NeighborWritable> neighborList = new ArrayList<NeighborWritable>(); 

     while(values.hasNext()) { 
      neighborList.add(values.next()); 
     } 

     NeighborArrayWritable neighbors = new NeighborArrayWritable 
          (neighborList.toArray(new NeighborWritable[0])); 

     Text out = new Text(neighbors.toString()); 

     output.collect(key, out); 

    } 

} 

@Override 
public int run(String[] arg0) throws Exception { 
    JobConf conf = Util.getMapRedJobConf("testJob", 
             SequenceFileInputFormat.class, 
             TTMapper.class, 
             Text.class, 
             NeighborWritable.class, 
             1, 
             TTReducer.class, 
             Text.class, 
             Text.class, 
             TextOutputFormat.class, 
             "test/in", 
             "test/out"); 
    JobClient.runJob(conf); 
    return 0; 
} 

public static void main(String[] args) throws Exception { 
    int res = ToolRunner.run(new TestTask(), args); 
    System.exit(res); 
} 

} 

辅助代码被以下: TextArrayWritable:

public class TextArrayWritable extends ArrayWritable { 
public TextArrayWritable() { 
    super(Text.class); 
} 

public TextArrayWritable(Text[] values) { 
    super(Text.class, values); 
} 

} 

NeighborWritable:

public class NeighborWritable implements Writable { 

private Text nodeId; 
private DoubleWritable weight; 

public NeighborWritable(Text nodeId, DoubleWritable weight) { 
    this.nodeId = nodeId; 
    this.weight = weight; 
} 

public NeighborWritable() { } 

public Text getNodeId() { 
    return nodeId; 
} 

public DoubleWritable getWeight() { 
    return weight; 
} 

public void setNodeId(Text nodeId) { 
    this.nodeId = nodeId; 
} 

public void setWeight(DoubleWritable weight) { 
    this.weight = weight; 
} 

@Override 
public void readFields(DataInput in) throws IOException { 
    nodeId = new Text(); 
    nodeId.readFields(in); 

    weight = new DoubleWritable(); 
    weight.readFields(in); 
} 

@Override 
public void write(DataOutput out) throws IOException { 
    nodeId.write(out); 
    weight.write(out); 
} 

public String toString() { 
    return "NW[nodeId=" + (nodeId != null ? nodeId.toString() : "(null)") + 
     ",weight=" + (weight != null ? weight.toString() : "(null)") + "]"; 
} 

public boolean equals(Object o) { 
    if (!(o instanceof NeighborWritable)) { 
     return false; 
    } 

    NeighborWritable that = (NeighborWritable)o; 

    return (nodeId.equals(that.getNodeId()) && (weight.equals(that.getWeight()))); 
} 

} 

和的Util类:

public class Util { 

public static JobConf getMapRedJobConf(String jobName, 
               Class<? extends InputFormat> inputFormatClass, 
               Class<? extends Mapper> mapperClass, 
               Class<?> mapOutputKeyClass, 
               Class<?> mapOutputValueClass, 
               int numReducer, 
               Class<? extends Reducer> reducerClass, 
               Class<?> outputKeyClass, 
               Class<?> outputValueClass, 
               Class<? extends OutputFormat> outputFormatClass, 
               String inputDir, 
               String outputDir) throws IOException { 

    JobConf conf = new JobConf(); 

    if (jobName != null) 
     conf.setJobName(jobName); 

    conf.setInputFormat(inputFormatClass); 

    conf.setMapperClass(mapperClass); 

    if (numReducer == 0) { 
     conf.setNumReduceTasks(0); 

     conf.setOutputKeyClass(outputKeyClass); 
     conf.setOutputValueClass(outputValueClass); 

     conf.setOutputFormat(outputFormatClass); 

    } else { 
     // may set actual number of reducers 
     // conf.setNumReduceTasks(numReducer); 

     conf.setMapOutputKeyClass(mapOutputKeyClass); 
     conf.setMapOutputValueClass(mapOutputValueClass); 

     conf.setReducerClass(reducerClass); 

     conf.setOutputKeyClass(outputKeyClass); 
     conf.setOutputValueClass(outputValueClass); 

     conf.setOutputFormat(outputFormatClass); 

    } 

    // delete the existing target output folder 
    FileSystem fs = FileSystem.get(conf); 
    fs.delete(new Path(outputDir), true); 


    // specify input and output DIRECTORIES (not files) 
    FileInputFormat.addInputPath(conf, new Path(inputDir)); 
    FileOutputFormat.setOutputPath(conf, new Path(outputDir)); 

    return conf;   

} 

} 

我输入如下图:(二进制格式,在这里我给文本格式)

1 2 
2 1,3,5 
3 2,4 
4 3,5 
5 2,4 

根据代码的逻辑输出应该是:

1 NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}] 
2 NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],NW[nodeId=1,weight=1.0],}] 
3 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}] 
4 NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],}] 
5 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}] 

但输出未来如:

1 NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}] 
2 NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}] 
3 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}] 
4 NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}] 
5 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}] 

我不明白为什么预期输出不出来的原因。任何帮助将不胜感激。

谢谢。对象再利用

while(values.hasNext()) { 
    neighborList.add(values.next()); 
} 

values.next()

+0

看起来你正在做类似于PageRanking的事情。您是否考虑过使用[Apache hama](http://hama.apache.org/)和[Apache Giraph](http://giraph.apache.org/)进行图形处理?这些框架使图形处理更容易。检查这个[Hama Wiki](http://wiki.apache.org/hama/PageRank)的PageRank。 – 2012-07-12 13:40:52

+0

@Praveen是的,我听说过Giraph。他们是不是也使用Hadoop作为基本框架?但是,我认为这对我的工作可能更有效一些,因为我不必在每次迭代中多次复制图表。一旦我完成了Hadoop,我也一定会看到Giraph。感谢您的意见。 – user1484380 2012-07-12 23:27:27

回答

3

你遭遇犯规将返回相同的对象引用,但对象的基本内容将在每次迭代发生变化(readFields方法被调用来重新填充内容)

建议你修改到(你需要从设置方法获取配置conf变量,除非你可以从记者或OutputCollector得到它 - 对不起,我不使用旧的API)

while(values.hasNext()) { 
    neighborList.add(
     ReflectionUtils.copy(conf, values.next(), new NeighborWritable()); 
} 
+0

非常感谢!奇迹般有效!我使用的是Hadoop-0.18.0,其中'ReflectionUtils'没有'copy()'方法。由于我的'neighborWritable'是一个简单的类,我为深拷贝编写了一个拷贝构造函数,它工作得很好。 – user1484380 2012-07-12 19:06:12

0

但我仍然不明白为什么我的单元测试通过了。下面是代码 -

public class UWLTInitReducerTest { 

private Text key; 
private Iterator<NeighborWritable> values; 
private NeighborArrayWritable nodeData; 
private TTReducer reducer; 

/** 
* Set up the states for calling the map function 
*/ 
@Before 
public void setUp() throws Exception { 
    key = new Text("1001"); 
    NeighborWritable[] neighbors = new NeighborWritable[4]; 
    for (int i = 0; i < 4; i++) { 
     neighbors[i] = new NeighborWritable(new Text("300" + i), new DoubleWritable((double) 1/(1 + i))); 
    } 

    values = Arrays.asList(neighbors).iterator(); 

    nodeData = new NeighborArrayWritable(neighbors); 

    reducer = new TTReducer(); 

} 

/** 
* Test method for InitModelMapper#map - valid input 
*/ 
@Test 
public void testMapValid() { 

    // mock the output object 
    OutputCollector<Text, UWLTNodeData> output = mock(OutputCollector.class); 

    try { 
     // call the API 
     reducer.reduce(key, values, output, null); 

     // in order (sequential) verification of the calls to output.collect() 
     verify(output).collect(key, nodeData); 

    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 

} 

} 

为什么没有这个代码赶上错误?

+0

因为您有支持值迭代器的数组列表。hadoop实际上只有一个支持迭代器的值,并且每次迭代都会调用readFields来更改它的内容 – 2012-07-12 19:51:41

+0

好吧,我明白了。谢谢! – user1484380 2012-07-12 23:23:19

相关问题