2017-09-25 87 views
1

我试图通过制作电影推荐系统来练习Big Data Mapreduce。我的代码:了解Mapreduce代码

*imports 



public class MRS { 
    public static class Map extends Mapper<LongWritable, Text, Text, Text> { 
     public void map(LongWritable key, Text value, Context con) 
       throws IOException, InterruptedException { 
      String line = value.toString(); 

      StringTokenizer token = new StringTokenizer(line); 

     while(token.hasMoreTokens()){ 
      String userId = token.nextToken(); 
      String movieId = token.nextToken(); 
      String ratings =token.nextToken(); 
      token.nextToken(); 
      con.write(new Text(userId), new Text(movieId + "," + ratings)); 
     } 

    } 
} 

public static class Reduce extends 
     Reducer<Text, IntWritable, Text, Text> { 
    public void reduce(Text key, Iterable<Text> value,Context con) throws IOException, InterruptedException{ 
     int item_count=0; 
     int item_sum =0; 
     String result="["; 
     for(Text t : value){ 
      String s = t.toString(); 
      StringTokenizer token = new StringTokenizer(s,","); 
      while(token.hasMoreTokens()){ 
      token.nextToken(); 
      item_sum=item_sum+Integer.parseInt(token.nextToken()); 
      item_count++; 
      } 
      result=result+"("+s+"),"; 


     } 
     result=result.substring(0, result.length()-1); 
     result=result+"]"; 
     result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result; 

     con.write(key, new Text(result)); 
    } 
} 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration con = new Configuration(); 
    Job job = new Job(con,"Movie Recommendation"); 

    job.setJarByClass(MRS.class); 


    job.setMapperClass(Map.class); 
    job.setCombinerClass(Reduce.class); 
    job.setReducerClass(Reduce.class); 


    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 


    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 


    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 


    System.exit(job.waitForCompletion(true) ? 0 : 1); 

} 

} 

我使用从here

的movielens数据集,其中输入文件是u.data

,并运行此代码后,我的输出应该像

用户id ITEM_COUNT ,Item_sum,[带评级的movie_Id列表]

不过,我得到这个

99 173,4 
99 288,4 
99 66,3 
99 203,4 
99 105,2 
99 12,5 
99 1,4 
99 741,3 
99 895,3 
99 619,4 
99 742,5 
99 294,4 
99 196,4 
99 328,4 
99 120,2 
99 246,3 
99 232,4 
99 181,5 
99 201,3 
99 978,3 
99 123,3 
99 433,4 
99 345,3 

这应该是地图类

回答

0

我做了一些调整,以代码的输出,这是给我确切的预期的结果。 这是我的新代码

进口*

public class MRS { 
public static class Map extends 
     Mapper<LongWritable, Text, IntWritable, Text> { 
    public void map(LongWritable key, Text value, Context con) 
      throws IOException, InterruptedException { 
     String line = value.toString(); 
     String[] s = line.split("\t"); 
     StringTokenizer token = new StringTokenizer(line); 

     while (token.hasMoreTokens()) { 
      IntWritable userId = new IntWritable(Integer.parseInt(token 
        .nextToken())); 
      String movieId = token.nextToken(); 
      String ratings = token.nextToken(); 
      token.nextToken(); 
      con.write(userId, new Text(movieId + "," + ratings)); 
     } 

    } 
} 

public static class Reduce extends 
     Reducer<IntWritable, Text, IntWritable, Text> { 
    public void reduce(IntWritable key, Iterable<Text> value, Context con) 
      throws IOException, InterruptedException { 
     int item_count = 0; 
     int item_sum = 0; 
     String result = ""; 
     for (Text t : value) { 
      String s = t.toString(); 
      StringTokenizer token = new StringTokenizer(s, ","); 

      result = result + "[" + s + "],"; 

     } 
     result = result.substring(1, result.length() - 2); 

     System.out.println(result); 
     con.write(key, new Text(result)); 
    } 
} 

public static void main(String[] args) throws IOException, 
     ClassNotFoundException, InterruptedException { 
    Configuration con = new Configuration(); 
    Job job = new Job(con, "Movie Recommendation"); 

    job.setJarByClass(MRS.class); 

    job.setMapperClass(Map.class); 
    job.setCombinerClass(Reduce.class); 
    job.setReducerClass(Reduce.class); 

    job.setOutputKeyClass(IntWritable.class); 
    job.setOutputValueClass(Text.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 

} 

} 

我改变的是 驱动程序代码

job.setOutputKeyClass(IntWritable.class); 

映射代码

Mapper<LongWritable, Text, IntWritable, Text> 

减速码

public static class Reduce extends 
    Reducer<Text, IntWritable, Text, Text> { 
    public void reduce(Text key, Iterable<Text> value,Context con) throws 
IOException, InterruptedException{ 

我认为,问题是,为什么它打印映射器甚至不executng减速

纠正我,如果我错了outputkey和outputvalue数据匹配映射类多数民众赞成。