2016-05-14 194 views
2

我有以下的输入大的TSV文件:MapReduce的减速机的输出错误

Site1 Tag1 
Site1 Tag34 
Site1 Tag8 
Site2 Tag75 
Site2 Tag54 
Site2 Tag8 
Site3 Tag24 
Site3 Tag34 
Site3 Tag1 
... 

我想找到,用Hadoop MapReduce的,在输入和类似标签的数量在所有这些站点之间的类似网站对帮助每一双。

输出的部分输入提出:

Site1 Site2 1 // Site1 is similar to Site2 with 1 tag (Tag8) 
Site1 Site3 2 // Site1 is similar to Site3 with 2 tag (Tag1 and Tag34) 
Site2 Site1 1 
Site3 Site1 2 

我要输出每每个站点只有10个最相似的网站。

每个站点有3个标签

我想用2个MapReduce作业:

  1. 要映射标签(密钥)和网站,并通过标签减少,在减少阶段采取的所有站点特定的标签并写出输出'标签SiteX SiteY'
  2. 第二个MapReduce作业将接受第一个输入并执行GROUP BY SiteX,SiteY对以获得一对类似网站中类似标签的数量。

我试图实现第一个MAPRED,但我得到的只是“标记,站点”输出。

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class RawToSimilarTagMapper { 

    public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> { 

     private Text site = new Text(); 
     private Text tag  = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String [] siteTag = value.toString().split("\t"); 
      site.set(siteTag[0]); 
      tag.set(siteTag[1]); 

      context.write(tag, site); 
      System.out.println(); 
     } 
    } 

    public static class SimilarSiteReducer extends Reducer<Text, Text, Text, Text> { 
     private Text value = new Text(); 

     public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, InterruptedException { 
      for (Text text : values) { 
       for (Text text2 : values) { 
        if (!text.equals(text2)) { 
         value.set(text.toString() + "\t" + text2.toString()); 
         output.collect(key, value); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 
     Job job = Job.getInstance(conf, "raw-to-similar"); 
     job.setJarByClass(RawToSimilarTagMapper.class); 
     job.setMapperClass(TagToSiteMapper.class); 
     job.setCombinerClass(SimilarSiteReducer.class); 
     job.setReducerClass(SimilarSiteReducer.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     FileInputFormat.addInputPath(job, new Path(args[1])); 
     FileOutputFormat.setOutputPath(job, new Path(args[2])); 
     FileSystem fs = null; 
     Path dstFilePath = new Path(args[2]); 
     try { 
      fs = dstFilePath.getFileSystem(conf); 
      if (fs.exists(dstFilePath)) 
       fs.delete(dstFilePath, true); 
     } catch (IOException e1) { 
      e1.printStackTrace(); 
     } 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

我在做什么错在这里?

同样对于下一个阶段,我怎么能得到每个网站只有前10名最相似的网站?

回答

1

这就是我该怎么做的。此外,您可以通过在第二份工作的输出中编写第三份工作来进行排序,以获得排名前十的站点(提示:您只需要编写映射器)注意:这适用于提供问题的示例数据。您可能需要首先清理格式不正确的数据。

最终输出:

Site2 2 
Site2 Site1 1 
Site3 1 
Site3 Site1 2 

代码:

import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

public class TopSites{ 

    public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> { 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String [] siteTag = value.toString().split("\t"); 
      context.write(new Text(siteTag[1]), new Text(siteTag[0])); 
      System.out.println(siteTag[1] + " --> " + siteTag[0]); 
     } 
    } 


    public static class TagToSiteReducer extends Reducer<Text, Text, Text, Text> { 
     public void reduce(Text key, Iterable<Text> values, Context context) 
       throws IOException, InterruptedException { 
      String l = ""; 
      System.out.print("Key: [" + key.toString() + "] Values: ["); 

      for (Text site : values) 
       l += site + "\t"; 

      l=l.substring(0, l.length()-1); 
      System.out.println(l + "]"); 
      context.write(new Text(key), new Text(l)); 
     } 
    } 
    public static class TopSiteMapper extends Mapper<Object, Text, Text, IntWritable> { 
     private final static IntWritable one = new IntWritable(1); 
     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 

      String [] data = value.toString().split("\t"); 
      String sites =""; 
      System.out.println("map received: "+ value.toString()); 

      for(int i=1;i<data.length;i++) 
       sites += data[i] + "\t";  

      System.out.println(sites.substring(0,sites.length()-1)); 
      context.write(new Text(sites.substring(0,sites.length()-1)), one); 
     } 
    } 

    public static class TopSiteReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
     public void reduce(Text key, Iterable<IntWritable> values, Context context) 
       throws IOException, InterruptedException { 
      int sum = 0; 
      System.out.print("Key: [" + key.toString() + "] Values: ["); 

      for (IntWritable site : values){ 
       System.out.print(site.get()); 
       sum+=site.get(); 
      } 
      System.out.println("]"); 
      context.write(new Text(key), new IntWritable(sum)); 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 

     Job job = Job.getInstance(conf, "site-to-tag"); 

     job.setJarByClass(TopSites.class); 
     job.setMapperClass(TagToSiteMapper.class); 
     job.setReducerClass(TagToSiteReducer.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, TagToSiteMapper.class); 

     Path outputpath = new Path(args[1]+"_temp"); 
     FileOutputFormat.setOutputPath(job,outputpath); 

     FileSystem fs = null; 
     Path dstFilePath = new Path(args[1]); 
     try { 
      fs = dstFilePath.getFileSystem(conf); 
      if (fs.exists(dstFilePath)) 
       fs.delete(dstFilePath, true); 

      dstFilePath = new Path(args[1]+"_temp"); 
      fs = dstFilePath.getFileSystem(conf); 
      if (fs.exists(dstFilePath)) 
       fs.delete(dstFilePath, true); 
     } catch (IOException e1) { 
      e1.printStackTrace(); 
     } 

     int code = job.waitForCompletion(true)?0:1; 
     if(code == 0) 
     { 
      Job SecondJob = Job.getInstance(conf, "Tag-to-Sites"); 
      SecondJob.setJarByClass(TopSites.class); 

      SecondJob.setOutputKeyClass(Text.class); 
      SecondJob.setOutputValueClass(IntWritable.class); 

      SecondJob.setMapperClass(TopSiteMapper.class); 
      SecondJob.setCombinerClass(TopSiteReducer.class); 
      SecondJob.setReducerClass(TopSiteReducer.class); 


      FileInputFormat.addInputPath(SecondJob,new Path(args[1]+ "_temp")); 
      FileOutputFormat.setOutputPath(SecondJob,new Path(args[1])); 
      int exitCode = SecondJob.waitForCompletion(true)?0:1; 
      FileSystem.get(conf).delete(new Path(args[1]+"_temp"), true); 
      System.exit(exitCode); 
     } 
    } 
} 

控制台STD输出:

Tag1 --> Site1 
Tag34 --> Site1 
Tag8 --> Site1 
Tag75 --> Site2 
Tag54 --> Site2 
Tag8 --> Site2 
Tag24 --> Site3 
Tag34 --> Site3 
Tag1 --> Site3 
Key: [Tag1] Values: [Site3 Site1] 
Key: [Tag24] Values: [Site3] 
Key: [Tag34] Values: [Site3 Site1] 
Key: [Tag54] Values: [Site2] 
Key: [Tag75] Values: [Site2] 
Key: [Tag8] Values: [Site2 Site1] 
map received: Tag1 Site3 Site1 
Site3 Site1 
map received: Tag24 Site3 
Site3 
map received: Tag34 Site3 Site1 
Site3 Site1 
map received: Tag54 Site2 
Site2 
map received: Tag75 Site2 
Site2 
map received: Tag8 Site2 Site1 
Site2 Site1 
Key: [Site2] Values: [11] 
Key: [Site2 Site1] Values: [1] 
Key: [Site3] Values: [1] 
Key: [Site3 Site1] Values: [11] 
Key: [Site2] Values: [2] 
Key: [Site2 Site1] Values: [1] 
Key: [Site3] Values: [1] 
Key: [Site3 Site1] Values: [2] 
+0

对于第二/第三工作你也可以用猪脚本 –

0

看起来你的组合器在这里出现问题。映射器&组合器的输出格式必须相同,在您的情况下不适用。你可以注释掉combiner作为它只用于性能优化,并运行相同。