2017-03-08 283 views
1

我有两个与RDDS以下结构将RDD的每个元素添加到Spark Scala中另一个RDD的每个元素。

org.apache.spark.rdd.RDD[(Long, Double)] 

这里RDD的每一行包含一个索引Long和值Double。我想将RDD的每个元素添加到Spark Scala中其他RDD的每个元素。

一个例子是这样的:

RDD1集:

Array[(Long, Double)] = Array((0,-3),(1,2)) 

RDD2:

Array[(Long, Double)] = Array((0,4),(1,-2)) 

结果:

Array[(Long, Double)] = Array((0,1),(0,-5),(1,6),(1,0)) 

回答

1

你真正在这里做什么是你的两个rdd的的笛卡尔乘积,在那里你只需求和值每导致((key, value), (key, value))对,保持第一元组的关键

val result = rdd1.cartesian(rdd2).map(x => (x._1._1, x._2._2 + x._1._2)) 
// Result 
result.collect() 
Array[(Int, Int)] = Array((0,1), (0,-5), (1,6), (1,0)) 

小心使用cartesian()不过,内存消耗将大大增加你的rdd的更大。

0

请试试这个:

val df1 = Seq((0,-3),(1,2)).toDF("col1", "col2") 
val df2 = Seq((0,4),(1,-2)).toDF("col1", "col2") 

df1.createOrReplaceTempView("temp1") 
df2.createOrReplaceTempView("temp2") 

spark.sql("SELECT t1.col1 + t2.col1, t1.col2 + t2.col2 FROM t1, t2").show 
0

拉链两个RDDS然后映射在该计算的款项

相关问题