2016-11-16 59 views
4

我跑了火花外壳以下工作:星火UI DAG阶段断开

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist 
d.join(d.reduceByKey(_ + _)).collect 

星火UI显示三个阶段。阶段4和5对应于d的计算,并且阶段6对应于对collect动作的计算。由于d持续存在,我预计只有两个阶段。然而阶段5目前没有连接到任何其他阶段。

Spark UI DAG

所以尝试没有坚持用运行相同的计算,以及DAG貌似相同,只是没有表示RDD的绿点已经坚持。

Spark UI DAG without persist

我期望级11的输出是连接到平台12的输入,但事实并非如此。

看着舞台描述,阶段似乎表明d正在持续,因为阶段5有输入,但我仍然困惑,为什么阶段5甚至存在。

Spark UI stages

Spark UI stages without persist

回答

1
  1. 输入RDD缓存和缓存部分不重新计算。

    这可以用一个简单的测试来验证:

    import org.apache.spark.SparkContext 
    
    def f(sc: SparkContext) = { 
        val counter = sc.longAccumulator("counter") 
        val rdd = sc.parallelize(0 until 100).map(i => { 
        counter.add(1L) 
        (i%10, i) 
        }).persist 
        rdd.join(rdd.reduceByKey(_ + _)).foreach(_ =>()) 
        counter.value 
    } 
    
    assert(f(spark.sparkContext) == 100) 
    
  2. 缓存不会从DAG中删除阶段。

    如果数据被缓存了相应的阶段can be marked as skipped但仍然是DAG的一部分。可以使用检查点截断谱系,但它不是一回事,也不会从可视化中删除阶段。

  3. 输入阶段包含超过高速缓存的计算。

    Spark阶段将可以链接而不执行混洗的操作组合在一起。

    虽然输入阶段的一部分被缓存,但不包括准备洗牌文件所需的所有操作。这就是为什么你没有看到跳过的任务。

  4. 其余(detachment)只是图形可视化的限制。

  5. 如果首先重新分区的数据:

    import org.apache.spark.HashPartitioner 
    
    val d = sc.parallelize(0 until 1000000) 
        .map(i => (i%100000, i)) 
        .partitionBy(new HashPartitioner(20)) 
    
    d.join(d.reduceByKey(_ + _)).collect 
    

    你会得到DAG你最有可能寻找:

    enter image description here

0

添加到user6910411的详细解答,RDD直到第一个动作运行并且它计算整个DAG时,由于对RDD的惰性评估,它并没有被保留在内存中。因此,当您第一次运行collect()时,RDD“d”会第一次被保存在内存中,但没有任何内容被读取。如果您第二次运行collect(),则会读取缓存的RDD。

此外,如果执行对最终的RDD一个toDebugString,它示出了下面的输出:

scala> d.join(d.reduceByKey(_ + _)).toDebugString 
res5: String = 
(4) MapPartitionsRDD[19] at join at <console>:27 [] 
| MapPartitionsRDD[18] at join at <console>:27 [] 
| CoGroupedRDD[17] at join at <console>:27 [] 
+-(4) MapPartitionsRDD[15] at map at <console>:24 [] 
| | ParallelCollectionRDD[14] at parallelize at <console>:24 [] 
| ShuffledRDD[16] at reduceByKey at <console>:27 [] 
+-(4) MapPartitionsRDD[15] at map at <console>:24 [] 
    | ParallelCollectionRDD[14] at parallelize at <console>:24 [] 

的上面粗略图形表示可以被示出为:RDD Stages