2016-12-31 113 views
4

我想了解如何在火花中定义工作和阶段,并且为此我正在使用我发现的代码here并激发用户界面。为了在火花UI上看到它,我不得不多次复制和粘贴文件上的文本,因此需要更多时间来处理。Spark如何管理阶段?

这里是火花UI的输出:

Jobs

现在,据我所知,有三个工作,因为有三个动作,也该阶段产生的,因为洗牌动作,但我不明白为什么在作业1阶段4,5和6与作业0的阶段0,1和2相同,并且作业2也是如此。

我怎样才能知道什么阶段将在不仅仅是只看到java代码的工作(在执行任何事情之前)?而且,为什么跳过阶段4和9,我怎么知道它会在执行之前发生?

回答

6

据我所知,有三个工作,因为有三个动作

我甚至会说,有可能是更多的星火工作,但最小值是3。这一切都取决于变革的实施和所采取的行动。

我不理解的是为什么在作业1个阶段4,5和6是相同的阶段0,1和作业0的2和同样的情况对于工作2.

作业1是在RDD上运行的一些操作的结果,finalRdd。该RDD是使用(向后退出)创建的:join,textFile,mapdistinct

val people = sc.textFile("people.csv").map { line => 
    val tokens = line.split(",") 
    val key = tokens(2) 
    (key, (tokens(0), tokens(1))) }.distinct 

val cities = sc.textFile("cities.csv").map { line => 
    val tokens = line.split(",") 
    (tokens(0), tokens(1)) 
} 

val finalRdd = people.join(cities) 

运行上述,你会看到相同的DAG。现在

Job 6

,当你执行leftOuterJoinrightOuterJoin行动,你会得到另外两个的DAG。您正在使用先前使用的RDD来运行新的Spark作业,因此您会看到相同的阶段。

为什么阶段4和9跳过

通常情况下,星火会跳过某些阶段的执行。灰色阶段是已经计算出来的阶段,所以Spark会重用它们,从而使性能更好。

Job 7

我怎么知道会比作业只看到(执行任何东西之前)Java代码更什么阶段?

这就是RDD谱系(图)提供的。

scala> people.leftOuterJoin(cities).toDebugString 
res15: String = 
(3) MapPartitionsRDD[99] at leftOuterJoin at <console>:28 [] 
| MapPartitionsRDD[98] at leftOuterJoin at <console>:28 [] 
| CoGroupedRDD[97] at leftOuterJoin at <console>:28 [] 
+-(2) MapPartitionsRDD[81] at distinct at <console>:27 [] 
| | ShuffledRDD[80] at distinct at <console>:27 [] 
| +-(2) MapPartitionsRDD[79] at distinct at <console>:27 [] 
|  | MapPartitionsRDD[78] at map at <console>:24 [] 
|  | people.csv MapPartitionsRDD[77] at textFile at <console>:24 [] 
|  | people.csv HadoopRDD[76] at textFile at <console>:24 [] 
+-(3) MapPartitionsRDD[84] at map at <console>:29 [] 
    | cities.csv MapPartitionsRDD[83] at textFile at <console>:29 [] 
    | cities.csv HadoopRDD[82] at textFile at <console>:29 [] 

正如你可以看到自己,你将最终获得4个阶段,因为有3间洗牌的依赖关系(与分区数的边缘)。

圆括号中的数字是DAGScheduler最终将用于创建具有确切数量任务的任务集的分区数。每个阶段一个TaskSet