2015-10-14 41 views
0

我想研究一下hadoop并阅读了很多关于如何进行自然连接的内容。我有两个带有键和信息的文件,我想交叉并将最终结果显示为(a,b,c)。Hadoop多个输入错误分组 - 双向加入练习

我的问题是映射器正在为每个文件调用reducer。我期待收到类似(10,[R1,S10,S22])(10是关键字,1,10,22,是具有10个关键字的不同行的值,并且R和S是标记的,因此我可以识别来自哪个表格)。

事情是,我的减速器收到(10,[S10,S22]),并且只有在完成所有S文件后,我才得到另一个键值对,如(10,[R1])。这意味着,它通过每个文件分别按键,并调用减速器

我不确定是否正确的行为,如果我必须以不同的方式配置它,或者我做的一切都是错误的。

我也是新来的java,所以代码可能看起来不好。

我避免使用TextPair数据类型,因为我无法自己想出这件事,我会认为这将是另一种有效的方法(以防万一你想知道)。谢谢

基于WordCount示例运行hadoop 2.4.1。

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Iterator; 
import java.util.StringTokenizer; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextInputFormat; 
import org.apache.hadoop.mapred.TextOutputFormat; 
import org.apache.hadoop.mapred.lib.MultipleInputs; 

public class TwoWayJoin { 

    public static class FirstMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { 

     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 
      String line = value.toString(); 
      StringTokenizer tokenizer = new StringTokenizer(line); 

      Text a = new Text(); 
      Text b = new Text(); 

      a.set(tokenizer.nextToken()); 
      b.set(tokenizer.nextToken()); 

      output.collect(b, relation); 
     } 
    } 

    public static class SecondMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { 

     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 
      String line = value.toString(); 
      StringTokenizer tokenizer = new StringTokenizer(line); 

      Text b = new Text(); 
      Text c = new Text(); 

      b.set(tokenizer.nextToken()); 
      c.set(tokenizer.nextToken()); 

      Text relation = new Text("S"+c.toString()); 

      output.collect(b, relation); 

     } 
    } 

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { 
     public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 

      ArrayList <Text> RelationS = new ArrayList <Text>() ; 
      ArrayList <Text> RelationR = new ArrayList <Text>() ; 

      while (values.hasNext()) { 
       String relationValue = values.next().toString(); 
       if (relationValue.indexOf('R') >= 0){ 
        RelationR.add(new Text(relationValue)); 
       } else { 
        RelationS.add(new Text(relationValue)); 
       } 
      } 

      for(Text r : RelationR) { 
       for (Text s : RelationS) { 
        output.collect(key, new Text(r + "," + key.toString() + "," + s)); 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     JobConf conf = new JobConf(MultipleInputs.class); 
     conf.setJobName("TwoWayJoin"); 

     conf.setOutputKeyClass(Text.class); 
     conf.setOutputValueClass(Text.class); 

     conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(Reduce.class); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FirstMap.class); 
     MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, SecondMap.class); 

     Path output = new Path(args[2]); 

     FileOutputFormat.setOutputPath(conf, output); 

     FileSystem.get(conf).delete(output, true); 

     JobClient.runJob(conf); 

    } 
} 

R.txt

(a b(key)) 
2 46 
1 10 
0 24 
31 50 
11 2 
5 31 
12 36 
9 46 
10 34 
6 31 

S.txt

(b(key) c) 
45 32 
45 45 
46 10 
36 15 
45 21 
45 28 
45 9 
45 49 
45 18 
46 21 
45 45 
2 11 
46 15 
45 33 
45 6 
45 20 
31 28 
45 32 
45 26 
46 35 
45 36 
50 49 
45 13 
46 3 
46 8 
31 45 
46 18 
46 21 
45 26 
24 15 
46 31 
46 47 
10 24 
46 12 
46 36 

输出此代码是成功的,但为空,因为我要么具有阵列ř空或S的阵列空。

我有所有的行映射,如果我只是收集他们一个接一个,而不处理任何事情。

期望输出是

key "a,b,c" 
+0

你能提供输入样本,你在输出中得到了什么,你期待的输出是什么? –

+0

嗨,感谢您的关注,我添加了输入文件和预期输出。 – Djonatan

回答

1

的问题是与组合器。记住组合器在地图输出上应用reduce函数。所以间接地,它的作用是分别在你的R和S关系上应用reduce函数,这就是你在不同的reduce函数中得到R和S关系的原因。 注释掉

conf.setCombinerClass(Reduce.class); 

并尝试再次运行应该没有任何问题。注意,组合函数只有当你感觉你的地图输出的聚合结果在排序和洗牌完成后应用到输入上时才会相同。

+0

我刚刚得出同样的结论,小时输了。谢谢 – Djonatan