我想研究一下hadoop并阅读了很多关于如何进行自然连接的内容。我有两个带有键和信息的文件,我想交叉并将最终结果显示为(a,b,c)。Hadoop多个输入错误分组 - 双向加入练习
我的问题是映射器正在为每个文件调用reducer。我期待收到类似(10,[R1,S10,S22])(10是关键字,1,10,22,是具有10个关键字的不同行的值,并且R和S是标记的,因此我可以识别来自哪个表格)。
事情是,我的减速器收到(10,[S10,S22]),并且只有在完成所有S文件后,我才得到另一个键值对,如(10,[R1])。这意味着,它通过每个文件分别按键,并调用减速器
我不确定是否正确的行为,如果我必须以不同的方式配置它,或者我做的一切都是错误的。
我也是新来的java,所以代码可能看起来不好。
我避免使用TextPair数据类型,因为我无法自己想出这件事,我会认为这将是另一种有效的方法(以防万一你想知道)。谢谢
基于WordCount示例运行hadoop 2.4.1。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
public class TwoWayJoin {
public static class FirstMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
Text a = new Text();
Text b = new Text();
a.set(tokenizer.nextToken());
b.set(tokenizer.nextToken());
output.collect(b, relation);
}
}
public static class SecondMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
Text b = new Text();
Text c = new Text();
b.set(tokenizer.nextToken());
c.set(tokenizer.nextToken());
Text relation = new Text("S"+c.toString());
output.collect(b, relation);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
ArrayList <Text> RelationS = new ArrayList <Text>() ;
ArrayList <Text> RelationR = new ArrayList <Text>() ;
while (values.hasNext()) {
String relationValue = values.next().toString();
if (relationValue.indexOf('R') >= 0){
RelationR.add(new Text(relationValue));
} else {
RelationS.add(new Text(relationValue));
}
}
for(Text r : RelationR) {
for (Text s : RelationS) {
output.collect(key, new Text(r + "," + key.toString() + "," + s));
}
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(MultipleInputs.class);
conf.setJobName("TwoWayJoin");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, SecondMap.class);
Path output = new Path(args[2]);
FileOutputFormat.setOutputPath(conf, output);
FileSystem.get(conf).delete(output, true);
JobClient.runJob(conf);
}
}
R.txt
(a b(key))
2 46
1 10
0 24
31 50
11 2
5 31
12 36
9 46
10 34
6 31
S.txt
(b(key) c)
45 32
45 45
46 10
36 15
45 21
45 28
45 9
45 49
45 18
46 21
45 45
2 11
46 15
45 33
45 6
45 20
31 28
45 32
45 26
46 35
45 36
50 49
45 13
46 3
46 8
31 45
46 18
46 21
45 26
24 15
46 31
46 47
10 24
46 12
46 36
输出此代码是成功的,但为空,因为我要么具有阵列ř空或S的阵列空。
我有所有的行映射,如果我只是收集他们一个接一个,而不处理任何事情。
期望输出是
key "a,b,c"
你能提供输入样本,你在输出中得到了什么,你期待的输出是什么? –
嗨,感谢您的关注,我添加了输入文件和预期输出。 – Djonatan