2013-02-10 41 views
2

我在Hadoop上实现PageRank算法,正如标题所示我想出了如下因素的错误,而试图执行该代码:从图中关键Hadoop执行错误:类型在映射键中不匹配:expected org.apache.hadoop.io.Text,收到org.apache.hadoop.io.LongWritable

类型不匹配:预计org.apache.hadoop.io.Text,收到org.apache.hadoop.io.LongWritable

在我的输入文件中,我将图形节点ID作为关键字和一些关于它们的信息存储为值。我的输入文件的格式如下:

1 \ t 3.4,2,5,6,67

4 \ t 4.2,77,2,7,83

... ..

试图了解错误说什么我尝试使用LongWritable作为我的主变量类型,如下面的代码中所示。这意味着我有:

地图< LongWritable,LongWritable,LongWritable,LongWritable>

减少< LongWritable,LongWritable,LongWritable,LongWritable>

而且,我也试过:

地图<文本,文本,文本,文本>

减少<文本,文本,文本,文本>

也:

地图< LongWritable,文本,LongWritable,文字>

减少< LongWritable,文本,LongWritable,文字>

,我总是拿出同样的错误。我想我很难理解错误中期望和接受的方式。这是否意味着我的地图函数期望从我的输入文件中获得LongWritable,并且它具有Text?我使用的输入文件格式或变量类型有问题吗?

下面是完整的代码,你能告诉我什么改变在哪里?:

import java.io.IOException; 
import java.util.*; 
import java.util.regex.Matcher; 
import java.util.regex.Pattern; 
import java.lang.Object.*; 

import org.apache.commons.cli.ParseException; 
import org.apache.commons.lang.StringUtils; 
import org.apache.commons.configuration.Configuration; 
import org.apache.hadoop.security.Credentials; 
import org.apache.log4j.*; 
import org.apache.commons.logging.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 



public class Pagerank 
{ 


public static class PRMap extends Mapper<LongWritable, LongWritable, LongWritable, LongWritable> 
{ 

    public void map(LongWritable lineNum, LongWritable line, OutputCollector<LongWritable, LongWritable> outputCollector, Reporter reporter) throws IOException, InterruptedException 
    { 
     if (line.toString().length() == 0) { 
      return; 
     } 

     Text key = new Text(); 
     Text value = new Text(); 
     LongWritable valuel = new LongWritable(); 
     StringTokenizer spline = new StringTokenizer(line.toString(),"\t"); 
     key.set(spline.nextToken()); 
     value.set(spline.nextToken()); 

     valuel.set(Long.parseLong(value.toString())); 
     outputCollector.collect(lineNum,valuel); 


     String info = value.toString(); 
     String splitter[] = info.split(","); 

     if(splitter.length >= 3) 
     { 
      float f = Float.parseFloat(splitter[0]); 
      float pagerank = f/(splitter.length - 2); 

      for(int i=2;i<splitter.length;i++) 
      { 
       LongWritable key2 = new LongWritable(); 
       LongWritable value2 = new LongWritable(); 
       long l; 

       l = Long.parseLong(splitter[i]); 
       key2.set(l); 
       //key2.set(splitter[i]); 
       value2.set((long)f); 

       outputCollector.collect(key2, value2); 
      } 
     } 
    } 
} 

public static class PRReduce extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable> 
{ 
    private Text result = new Text(); 
    public void reduce(LongWritable key, Iterator<LongWritable> values,OutputCollector<LongWritable, LongWritable> results, Reporter reporter) throws IOException, InterruptedException 
    { 

     float pagerank = 0; 
     String allinone = ","; 
     while(values.hasNext()) 
     { 
      LongWritable temp = values.next(); 
      String converted = temp.toString(); 
      String[] splitted = converted.split(","); 

      if(splitted.length > 1) 
      {     
       for(int i=1;i<splitted.length;i++) 
       { 
        allinone = allinone.concat(splitted[i]); 
        if(i != splitted.length - 1) 
         allinone = allinone.concat(","); 
       } 
      } 
      else 
      { 
       float f = Float.parseFloat(splitted[0]); 
       pagerank = pagerank + f; 
      } 
     } 
     String last = Float.toString(pagerank); 
     last = last.concat(allinone); 

     LongWritable value = new LongWritable(); 
     value.set(Long.parseLong(last)); 

     results.collect(key, value); 
    }  
} 



public static void main(String[] args) throws Exception 
{  


    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); 

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: wordcount <in> <out>"); 
     System.exit(2); 
    } 

    Job job = new Job(conf, "pagerank_itr0"); 

    job.setJarByClass(Pagerank.class);  
    job.setMapperClass(Pagerank.PRMap.class);  
    job.setReducerClass(Pagerank.PRReduce.class);  


    job.setOutputKeyClass(LongWritable.class);    
    job.setOutputValueClass(LongWritable.class);    
    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
    job.waitForCompletion(true); 

} 
} 

回答

3

您未在作业配置设置映射输出类。 尝试使用以下方法设置作业中的键和值类:

setMapOutputKeyClass();

setMapOutputValueClass();

相关问题