据我所知,有三个工作,因为有三个动作
我甚至会说,有可能是更多的星火工作,但最小值是3
。这一切都取决于变革的实施和所采取的行动。
我不理解的是为什么在作业1个阶段4,5和6是相同的阶段0,1和作业0的2和同样的情况对于工作2.
作业1是在RDD上运行的一些操作的结果,finalRdd
。该RDD是使用(向后退出)创建的:join
,textFile
,map
和distinct
。
val people = sc.textFile("people.csv").map { line =>
val tokens = line.split(",")
val key = tokens(2)
(key, (tokens(0), tokens(1))) }.distinct
val cities = sc.textFile("cities.csv").map { line =>
val tokens = line.split(",")
(tokens(0), tokens(1))
}
val finalRdd = people.join(cities)
运行上述,你会看到相同的DAG。现在
,当你执行leftOuterJoin
或rightOuterJoin
行动,你会得到另外两个的DAG。您正在使用先前使用的RDD来运行新的Spark作业,因此您会看到相同的阶段。
为什么阶段4和9跳过
通常情况下,星火会跳过某些阶段的执行。灰色阶段是已经计算出来的阶段,所以Spark会重用它们,从而使性能更好。
我怎么知道会比作业只看到(执行任何东西之前)Java代码更什么阶段?
这就是RDD谱系(图)提供的。
scala> people.leftOuterJoin(cities).toDebugString
res15: String =
(3) MapPartitionsRDD[99] at leftOuterJoin at <console>:28 []
| MapPartitionsRDD[98] at leftOuterJoin at <console>:28 []
| CoGroupedRDD[97] at leftOuterJoin at <console>:28 []
+-(2) MapPartitionsRDD[81] at distinct at <console>:27 []
| | ShuffledRDD[80] at distinct at <console>:27 []
| +-(2) MapPartitionsRDD[79] at distinct at <console>:27 []
| | MapPartitionsRDD[78] at map at <console>:24 []
| | people.csv MapPartitionsRDD[77] at textFile at <console>:24 []
| | people.csv HadoopRDD[76] at textFile at <console>:24 []
+-(3) MapPartitionsRDD[84] at map at <console>:29 []
| cities.csv MapPartitionsRDD[83] at textFile at <console>:29 []
| cities.csv HadoopRDD[82] at textFile at <console>:29 []
正如你可以看到自己,你将最终获得4个阶段,因为有3间洗牌的依赖关系(与分区数的边缘)。
圆括号中的数字是DAGScheduler
最终将用于创建具有确切数量任务的任务集的分区数。每个阶段一个TaskSet
。