2011-09-27 126 views
2

我想为我的MapReduce作业实现一个MultithreadMapper。当使用MultithreadMapper替换映射器时,映射键的类型不匹配

为此,我用一个工作代码中的MultithreadMapper替换了Mapper。

这里是我得到的exeption:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862) 
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549) 
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) 
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211) 
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) 
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) 
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) 
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264) 

下面的代码设置:

public static void main(String[] args) { 
    try { 
     if (args.length != 2) { 
      System.err.println("Usage: MapReduceMain <input path> <output path>"); 
      System.exit(123); 
     } 
     Job job = new Job(); 
     job.setJarByClass(MapReduceMain.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration()); 
     FileStatus[] files = fs.listStatus(new Path(args[0])); 
     for(FileStatus sfs:files){ 
      FileInputFormat.addInputPath(job, sfs.getPath()); 
     } 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.setMapperClass(MyMultithreadMapper.class); 
     job.setReducerClass(MyReducer.class); 
     MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads); 

     job.setOutputKeyClass(IntWritable.class); 
     job.setOutputValueClass(MyPage.class); 

     job.setOutputFormatClass(SequenceFileOutputFormat.class);//write the result as sequential file 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

而这里的映射器的代码:

public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> { 

ConcurrentLinkedQueue<MyScraper> scrapers = new ConcurrentLinkedQueue<MyScraper>(); 

public static final int    nThreads = 5; 

public MyMultithreadMapper() { 
    for (int i = 0; i < nThreads; i++) { 
     scrapers.add(new MyScraper()); 
    } 
} 

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
    MyScraper scraper = scrapers.poll(); 

    MyPage result = null; 
    for (int i = 0; i < 10; i++) { 
     try { 
      result = scraper.scrapPage(value.toString(), true); 
      break; 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    if (result == null) { 
     result = new MyPage(); 
     result.setUrl(key.toString()); 
    } 

    context.write(new IntWritable(result.getUrl().hashCode()), result); 

    scrapers.add(scraper); 
} 

凭啥我收到这个?

回答

1

这里是有许多工作要做:

MultithreadedMapper.setMapperClass(工作,MyMapper.class);

MyMapper必须实现地图逻辑

MultithreadMapper必须是空的

相关问题