0
我有5个洗牌的键值rdds,一个大的(1,000,000条记录)和4个相对较小的(100,000条记录)。所有的rdds都有相同数量的分区,我有两个策略来合并5个分区,spark:如何有效地合并混洗rdd?
- 合并5个RDDS一起
- 合并了4个个小RDDS在一起,然后加入bigone
我认为策略2会更有效,因为它不会重新洗牌大一号。但实验结果显示策略1效率更高。该代码和输出如下:
代码
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object MergeStrategy extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val bigRddSize = 1e6.toInt
val smallRddSize = 1e5.toInt
println(bigRddSize)
val bigRdd = sc.parallelize((0 until bigRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
bigRdd.take(10).foreach(println)
val smallRddList = (0 until 4).map(i => {
val rst = sc.parallelize((0 until smallRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
println(rst.count)
rst
}).toArray
// strategy 1
{
val begin = System.currentTimeMillis
val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100)
println(s1Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin)/1000d
println("S1 time count: %.1f s".format(timeCost))
}
// strategy 2
{
val begin = System.currentTimeMillis
val smallMerged = sc.union(smallRddList).distinct(100).cache
println(smallMerged.count)
val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => {
if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct
else if (left.isDefined) Array((key, left.get))
else if (right.isDefined) Array((key, right.get))
else throw new Exception("Cannot happen")
}
})
println(s2Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin)/1000d
println("S2 time count: %.1f s".format(timeCost))
}
}
输出
1000000
(688282474,0)
(-255073127,0)
(872746474,0)
(-792516900,0)
(417252803,0)
(-1514224305,0)
(1586932811,0)
(1400718248,0)
(939155130,0)
(1475156418,0)
100000
100000
100000
100000
1399777
S1 time count: 39.7 s
399984
1399894
S2 time count: 49.8 s
我的洗牌RDD理解错了?有人可以提供一些建议吗? 谢谢!
当您在策略2中进行连接时,您没有在策略1(仅联合)中进行任何联接。为什么?请记住,union不需要对数据进行混洗 - 它可以将每个执行器上存在的RDD拼凑起来。更具体地说,Union只创建一个狭义的依赖关系,而join则创建一个shuffle依赖关系。所以看起来策略1和2是苹果和桔子。 –
@SachinTyagi我的目标是区分5 rdds,策略1和2最后都不同。不同的将洗牌数据。由于大rdd已经洗牌,因此策略2不会洗牌大牌,应该更有效率,但实验显示相反。 – bourneli
我不太清楚我的理解,但是无论何时加入,您都会引入一个随机播放的依赖关系,从而最终(重新)洗牌数据。不管你的rdd早些时候是否洗牌。这与你所看到的一致。 –