2013-12-23 44 views
0

我目前正在尝试学习hadoop编程,并编写处理一个映射器内两个输入源的程序。这项工作与mapside-join问题类似。hadoop设置方法映射器

所以,我第一次使用分布式缓存,但是,它不工作。因此我第二次使用了setup()函数。它在单个PC上以本地执行模式运行良好,但在集群环境中无法运行。

我完全不知道原因。

如果我们使用setup()函数,集群是否有配置?

而下面是我的代码的一部分。这部分是体现迭代工作的工作驱动程序。

public int run(String[] arg0) throws Exception { 


    // TODO Auto-generated method stub 

    int iteration = 1; 

    Configuration conf = new Configuration(); 

    Path in = new Path(arg0[0]); 
    Path out = new Path(arg0[1]+"iteration_"+iteration); 

    conf.set("conf.threshold", arg0[2]); 

    Job job = new Job(conf, "Test"); 
    job.setJarByClass(getClass()); 
    job.setMapperClass(FirstMap.class); 
    job.setReducerClass(FirstReduce.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    FileInputFormat.addInputPath(job, in); 
    FileOutputFormat.setOutputPath(job, out); 
    job.waitForCompletion(true); 


    // start second job 

// long counter = 4;//job.getCounters().findCounter(SecondReduce.Counter.CONVERGED).getValue(); 
    String PriorPath = out.toString(); 

    boolean Updates = true; 
    while (Updates) { 

     iteration ++; 
     conf = new Configuration(); 

     Path out2 = new Path(arg0[1]+"iteration_"+iteration); 

     conf.set("prior.job.out", PriorPath); 
     conf.set("conf.iteration", iteration+""); 
     job = new Job(conf, "job"+iteration); 
     job.setJarByClass(getClass()); 
     job.setMapperClass(SecondMap.class); 
     job.setReducerClass(SecondReduce.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(IntWritable.class); 


     FileInputFormat.addInputPath(job, in); 
     FileOutputFormat.setOutputPath(job, out2); 
     job.waitForCompletion(true); 

     PriorPath = out2.toString(); 

     long counter = job.getCounters().findCounter(Counter.CONVERGED).getValue(); 
     Updates = (counter > 0); 
     System.out.println("counter : " + counter); 
    } 

    return 0; 
} 

此外,包含设置功能的映射器如下。

public static class SecondMap extends 
     Mapper<LongWritable, Text, Text, IntWritable> { 

    IntWritable one = new IntWritable(1); 
    Vector<String> Vec = new Vector<String>(); 
    Vector<String> Gen = new Vector<String>(); 
    int iteration; 
    @Override 
    public void setup(Context context) throws IOException, 
      InterruptedException { 
     super.setup(context); 
     Configuration conf = context.getConfiguration(); 
     Path Cand = new Path(conf.get("prior.job.out")); 
    // iteration = Integer.parseInt(conf.get("conf.iteration")); 
     String iter = conf.get("conf.iteration"); 
     iteration = Integer.parseInt(iter); 

     try { 
      FileSystem fs = FileSystem.get(conf); 
      FileStatus[] status = fs.listStatus(Cand); 
      for (int i = 0; i < status.length; i++) { 
       BufferedReader br = new BufferedReader(
         new InputStreamReader(fs.open(status[i].getPath()))); 
       String line; 
       line = br.readLine(); 
       while (line != null) { 
        System.out.println(line); 

        Vec.add(line); 
        line = br.readLine(); 
       } 
      } 
     } catch (Exception e) { 
      System.out.println("File not found"); 
     } 
     Gen = GenerateCandidate(Vec, iteration); 

    } 

    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     // something with CandGen 

     } 
    } 
} 

任何有此经验的人?

+0

你是什么意思你的设置功能不起作用?发生什么事?此外,您不应该尝试避免分布式缓存,因为它看起来不太好。它适用于我们大多数人。 –

+0

感谢您的快速回复。不工作的设置功能意味着存储到“Vec”中的值在集群中没有任何意义,尽管它在PC上运行良好(我使用相同的程序和输入做了简单的测试)。 –

+0

此外,当使用分布式缓存时,如果我通过命令输入缓存路径,此应用程序将起作用。这意味着修改程序中的路径会导致我不想要的结果。 –

回答

0

它仅针对每个Mapper任务或Reducer任务调用一次。所以如果10个mapper或reducer被派生出来工作,那么对于每个mapper和reducer它都会被调用一次。 该方法中添加的内容的一般准则是需要做的任何任务,可以在此处写入,例如,获取分布式缓存的路径,将参数传递给映射器和简化器。 类似的是清理方法。