2016-09-29 94 views
0

我有5个洗牌的键值rdds,一个大的(1,000,000条记录)和4个相对较小的(100,000条记录)。所有的rdds都有相同数量的分区,我有两个策略来合并5个分区,spark:如何有效地合并混洗rdd?

  1. 合并5个RDDS一起
  2. 合并了4个个小RDDS在一起,然后加入bigone

我认为策略2会更有效,因为它不会重新洗牌大一号。但实验结果显示策略1效率更高。该代码和输出如下:

代码

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkContext, SparkConf} 


object MergeStrategy extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 

    val conf = new SparkConf().setMaster("local[4]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val bigRddSize = 1e6.toInt 
    val smallRddSize = 1e5.toInt 
    println(bigRddSize) 

    val bigRdd = sc.parallelize((0 until bigRddSize) 
     .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache 
    bigRdd.take(10).foreach(println) 

    val smallRddList = (0 until 4).map(i => { 
     val rst = sc.parallelize((0 until smallRddSize) 
      .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache 
     println(rst.count) 
     rst 
    }).toArray 

    // strategy 1 
    { 
     val begin = System.currentTimeMillis 

     val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100) 
     println(s1Rst.count) 

     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("S1 time count: %.1f s".format(timeCost)) 
    } 

    // strategy 2 
    { 
     val begin = System.currentTimeMillis 

     val smallMerged = sc.union(smallRddList).distinct(100).cache 
     println(smallMerged.count) 

     val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => { 
      if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct 
      else if (left.isDefined) Array((key, left.get)) 
      else if (right.isDefined) Array((key, right.get)) 
      else throw new Exception("Cannot happen") 
     } 
     }) 
     println(s2Rst.count) 

     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("S2 time count: %.1f s".format(timeCost)) 
    } 

} 

输出

1000000 
(688282474,0) 
(-255073127,0) 
(872746474,0) 
(-792516900,0) 
(417252803,0) 
(-1514224305,0) 
(1586932811,0) 
(1400718248,0) 
(939155130,0) 
(1475156418,0) 
100000 
100000 
100000 
100000 
1399777 
S1 time count: 39.7 s 
399984 
1399894 
S2 time count: 49.8 s 

我的洗牌RDD理解错了?有人可以提供一些建议吗? 谢谢!

+0

当您在策略2中进行连接时,您没有在策略1(仅联合)中进行任何联接。为什么?请记住,union不需要对数据进行混洗 - 它可以将每个执行器上存在的RDD拼凑起来。更具体地说,Union只创建一个狭义的依赖关系,而join则创建一个shuffle依赖关系。所以看起来策略1和2是苹果和桔子。 –

+0

@SachinTyagi我的目标是区分5 rdds,策略1和2最后都不同。不同的将洗牌数据。由于大rdd已经洗牌,因此策略2不会洗牌大牌,应该更有效率,但实验显示相反。 – bourneli

+0

我不太清楚我的理解,但是无论何时加入,您都会引入一个随机播放的依赖关系,从而最终(重新)洗牌数据。不管你的rdd早些时候是否洗牌。这与你所看到的一致。 –

回答

0

我发现了一个方法,以更有效地合并RDD,请参阅下列2种合并策略:

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{HashPartitioner, SparkContext, SparkConf} 
import scala.collection.mutable.ArrayBuffer 

object MergeStrategy extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 

    val conf = new SparkConf().setMaster("local[4]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val rddCount = 20 
    val mergeCount = 5 
    val dataSize = 20000 
    val parts = 50 

    // generate data 
    scala.util.Random.setSeed(943343) 
    val testData = for (i <- 0 until rddCount) 
     yield sc.parallelize(scala.util.Random.shuffle((0 until dataSize).toList).map(x => (x, 0))) 
      .partitionBy(new HashPartitioner(parts)) 
      .cache 
    testData.foreach(x => println(x.count)) 

    // strategy 1: merge directly 
    { 
     val buff = ArrayBuffer[RDD[(Int, Int)]]() 
     val begin = System.currentTimeMillis 
     for (i <- 0 until rddCount) { 
      buff += testData(i) 
      if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) { 
       val merged = sc.union(buff).distinct 
        .partitionBy(new HashPartitioner(parts)).cache 
       println(merged.count) 

       buff.foreach(_.unpersist(false)) 
       buff.clear 
       buff += merged 
      } 
     } 
     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("Strategy 1 Time Cost: %.1f".format(timeCost)) 
     assert(buff.size == 1) 

     println("Strategy 1 Complete, with merged Count %s".format(buff(0).count)) 
    } 


    // strategy 2: merge directly without repartition 
    { 
     val buff = ArrayBuffer[RDD[(Int, Int)]]() 
     val begin = System.currentTimeMillis 
     for (i <- 0 until rddCount) { 
      buff += testData(i) 
      if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) { 
       val merged = sc.union(buff).distinct(parts).cache 
       println(merged.count) 

       buff.foreach(_.unpersist(false)) 
       buff.clear 
       buff += merged 
      } 
     } 
     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("Strategy 2 Time Cost: %.1f".format(timeCost)) 
     assert(buff.size == 1) 

     println("Strategy 2 Complete, with merged Count %s".format(buff(0).count)) 
    } 

} 

结果表明,策略1(时间成本20.8秒)比策略2更有效的(时间花费34.3秒)。我的电脑是8号窗口,CPU 4核2.0GHz,8GB内存。

唯一的区别是由HashPartitioner分区的策略,但策略2不是。结果,策略1产生ShuffledRDD,但策略1 MapPartitionsRDD。我认为RDD.distinct功能比MapPartitionsRDD更有效地处理ShuflledRDD。