可以说我有2 RDDs
:如何星火合并两个RDDS如果存储在关键值相匹配
rdd1 = [ (key1, value1), (key2, value2), (key3, value3) ]
rdd2 = [ (key4, value4), (key5, value5), (key6, value6) ]
我要合并的RDDS当且仅当储存在KEY1在RDD1集=价值=在rdd2中存储在key5中的值。
我该如何去使用Java或Scala在Spark中做这件事?
可以说我有2 RDDs
:如何星火合并两个RDDS如果存储在关键值相匹配
rdd1 = [ (key1, value1), (key2, value2), (key3, value3) ]
rdd2 = [ (key4, value4), (key5, value5), (key6, value6) ]
我要合并的RDDS当且仅当储存在KEY1在RDD1集=价值=在rdd2中存储在key5中的值。
我该如何去使用Java或Scala在Spark中做这件事?
我认为你正在寻找一个加入。
您需要做的第一件事是将它们映射到PairRDD,并将key1,key2等作为键。本例使用Tuple2输入:
JavaPairRDD<Integer, String> pairRdd = rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<Integer, String> val) throws Exception {
return new Tuple2<Integer, String>(val._1(), val._2());
}
});
一旦你映射两个,你只需要通过按键来加入他们的行列:
JavaPairRDD<Integer, Tuple2<String, String>> combined = pairRdd.join(pairRdd2);
然后,合并将是这样的:
[ (key1, (value1, value5)), (key2, (value2, value4)) ]
其中key1 == key5和key2 == key4
我给你scala spark的解决方案如下
scala> val rdd1 = sc.parallelize(List((3,"s"),(2,"df"),(1,"i")))
scala> val rdd2 = sc.parallelize(List((1,"ds"),(2,"h"),(1,"i")))
scala> val swaprdd1=rdd1.map(_.swap)
scala> val swaprdd2=rdd2.map(_.swap)
scala> val intersectrdd = rdd1.intersection(rdd2)
scala> val resultrdd = intersectrdd.map(_.swap)
我希望它对您的解决方案有所帮助:)