2012-07-25 107 views
2

我试图运行电子书Mahout中的chapter6(列表6.1〜6.4)中的推荐示例。有两个映射器/减速器对。下面是代码:正确使用SequenceFileInputFormat的键映射类型不匹配

映射器 - 1

public class WikipediaToItemPrefsMapper extends 
     Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> { 

私有静态最终图形号= Pattern.compile( “(\ d +)”);

@Override 
public void map(LongWritable key, 
      Text value, 
      Context context) 
throws IOException, InterruptedException { 

    String line = value.toString(); 
    Matcher m = NUMBERS.matcher(line); 
    m.find(); 
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); 
    VarLongWritable itemID = new VarLongWritable(); 
    while (m.find()) { 
     itemID.set(Long.parseLong(m.group())); 
     context.write(userID, itemID); 
    } 
} 

}

减速器 - 1

public class WikipediaToUserVectorReducer extends 
     Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> { 
@Override 
public void reduce(VarLongWritable userID, 
        Iterable<VarLongWritable> itemPrefs, 
        Context context) 
    throws IOException, InterruptedException { 

     Vector userVector = new RandomAccessSparseVector(
     Integer.MAX_VALUE, 100); 
     for (VarLongWritable itemPref : itemPrefs) { 
      userVector.set((int)itemPref.get(), 1.0f); 
     } 

     //LongWritable userID_lw = new LongWritable(userID.get()); 
     context.write(userID, new VectorWritable(userVector)); 
     //context.write(userID_lw, new VectorWritable(userVector)); 
} 

}

的减速器输出一个用户ID和一个userVector和它看起来像这样:98955 {590:1.0 22:1.0 9059 :1.0 3:1.0 2:1.0 1:1.0}提供的FileInputformat和TextInputFormat在驱动程序中使用。

我想使用另一对映射器减速器的用于处理此数据进一步:

映射器 - 2

public class UserVectorToCooccurenceMapper extends 
Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable> { 

@Override 
public void map(VarLongWritable userID, 
       VectorWritable userVector, 
       Context context) 
throws IOException, InterruptedException { 

    Iterator<Vector.Element> it = userVector.get().iterateNonZero(); 
    while (it.hasNext()) { 
     int index1 = it.next().index(); 
     Iterator<Vector.Element> it2 = userVector.get().iterateNonZero(); 
     while (it2.hasNext()) { 
      int index2 = it2.next().index(); 
       context.write(new IntWritable(index1), 
           new IntWritable(index2)); 
     } 
    } 
} 

}

减速器 - 2

公共类UserVectorToCooccurenceReducer延伸 减速机{

@Override 
public void reduce(IntWritable itemIndex1, 
      Iterable<IntWritable> itemIndex2s, 
      Context context) 
throws IOException, InterruptedException { 

    Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); 
    for (IntWritable intWritable : itemIndex2s) { 
     int itemIndex2 = intWritable.get(); 
     cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0); 
    } 
    context.write(itemIndex1, new VectorWritable(cooccurrenceRow)); 
} 

}

这是我使用的驱动程序:

public final class RecommenderJob extends Configured implements Tool { 

@覆盖 公众诠释运行(字串[] args)抛出异常{

Job job_preferenceValues = new Job (getConf()); 
    job_preferenceValues.setJarByClass(RecommenderJob.class); 
    job_preferenceValues.setJobName("job_preferenceValues"); 

    job_preferenceValues.setInputFormatClass(TextInputFormat.class); 
    job_preferenceValues.setOutputFormatClass(SequenceFileOutputFormat.class); 

    FileInputFormat.setInputPaths(job_preferenceValues, new Path(args[0])); 
    SequenceFileOutputFormat.setOutputPath(job_preferenceValues, new Path(args[1])); 

    job_preferenceValues.setMapOutputKeyClass(VarLongWritable.class); 
    job_preferenceValues.setMapOutputValueClass(VarLongWritable.class); 

    job_preferenceValues.setOutputKeyClass(VarLongWritable.class); 
    job_preferenceValues.setOutputValueClass(VectorWritable.class); 

    job_preferenceValues.setMapperClass(WikipediaToItemPrefsMapper.class); 
    job_preferenceValues.setReducerClass(WikipediaToUserVectorReducer.class); 

    job_preferenceValues.waitForCompletion(true); 

    Job job_cooccurence = new Job (getConf()); 
    job_cooccurence.setJarByClass(RecommenderJob.class); 
    job_cooccurence.setJobName("job_cooccurence"); 

    job_cooccurence.setInputFormatClass(SequenceFileInputFormat.class); 
    job_cooccurence.setOutputFormatClass(TextOutputFormat.class); 

    SequenceFileInputFormat.setInputPaths(job_cooccurence, new Path(args[1])); 
    FileOutputFormat.setOutputPath(job_cooccurence, new Path(args[2])); 

    job_cooccurence.setMapOutputKeyClass(VarLongWritable.class); 
    job_cooccurence.setMapOutputValueClass(VectorWritable.class); 

    job_cooccurence.setOutputKeyClass(IntWritable.class); 
    job_cooccurence.setOutputValueClass(VectorWritable.class); 

    job_cooccurence.setMapperClass(UserVectorToCooccurenceMapper.class); 
    job_cooccurence.setReducerClass(UserVectorToCooccurenceReducer.class); 

    job_cooccurence.waitForCompletion(true); 

    return 0; 

}

public static void main(String[] args) throws Exception { 
ToolRunner.run(new Configuration(), new RecommenderJob(), args); 

} }

的错误,我得到的是:

java.io.IOException: Type mismatch in key from map: expected org.apache.mahout.math.VarLongWritable, received org.apache.hadoop.io.IntWritable 

在谷歌搜索的修复程序的过程中,我发现我的问题是类似this question。但不同的是,我已经使用SequenceFileInputFormat和SequenceFileOutputFormat,我相信是正确的。我也看到org.apache.mahout.cf.taste.hadoop.item.RecommenderJob或多或少有类似的东西。在我的理解& Yahoo Tutorial

SequenceFileOutputFormat迅速连载任意数据类型的文件;相应的SequenceFileInputFormat会将该文件反序列化为相同的类型,并将数据以与之前的Reducer发出的相同的方式呈现给下一个Mapper。

我在做什么错?将真正体会到一些指针从别人。我花了一整天试图解决这一问题,并得到无处:(

回答

2

你的第二个映射器具有以下特征:

public class UserVectorToCooccurenceMapper extends 
     Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable> 

但是你定义你的驱动程序代码如下:

job_cooccurence.setMapOutputKeyClass(VarLongWritable.class); 
job_cooccurence.setMapOutputValueClass(VectorWritable.class); 

减速器期待<IntWritable, IntWritable>作为输入,所以你只要修改你的驱动程序代码:

job_cooccurence.setMapOutputKeyClass(IntWritable.class); 
job_cooccurence.setMapOutputValueClass(IntWritable.class); 
+0

非常感谢。我觉得瘸子犯了错误:) – Alps 2012-07-26 15:20:31

+0

np,有时你只需要一双新鲜的眼睛! – 2012-07-26 16:18:30

相关问题