2017-02-14 105 views
9

考虑一下我在Spark中的工作如下;如何知道Apache Spark中当前正在运行哪个阶段的工作?

CSV文件 ==>过滤用一个柱 ==>以样品 ==>另存为JSON

现在我的要求就是我怎么知道哪些步骤(撷取文件Filtering or 取样)当前正在以编程方式执行(最好使用Java API)?有没有办法呢?

我可以跟踪作业,舞台和任务使用SparkListener类。它可以像跟踪阶段ID一样完成。但是如何知道哪个阶段的Id是工作链中的哪一步。

我想发送通知给用户时,考虑按列过滤完成。为此,我创建了一个扩展SparkListener类的类。但是我无法从中找到当前正在执行的转换名称的名称。是否有可能跟踪?

public class ProgressListener extends SparkListener{ 

    @Override 
    public void onJobStart(SparkListenerJobStart jobStart) 
    { 

    } 

    @Override 
    public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) 
    { 
     //System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only 
    } 

    @Override 
    public void onTaskStart(SparkListenerTaskStart taskStart) 
    { 
     //no such method like taskStart.name() 
    } 
} 
+1

关闭注释看起来不太合适:这当然是一个编程相关的问题,它在宽度/范围内似乎也是合理的。 – javadba

回答

3

您无法准确知道何时(例如)过滤器操作开始或结束。

这是因为您有转换(filter,map,...)和操作(count,foreach,...)。 Spark将尽可能多的操作放到一个阶段。然后在您输入的不同分区上并行执行舞台。问题来了。

假设你有几个工人和下面的程序

LOAD ==>地图==>过滤==> GROUP BY +聚合

这一方案将可能有两个阶段:第一阶段将加载文件并应用mapfilter。 然后输出将被混洗以创建组。在第二阶段,将执行聚合。

现在,问题是,你有几个工人,每个人都会并行处理一部分输入数据。也就是说,群集中的每个执行程序都会收到程序的副本(当前阶段)并在分配的分区上执行此操作。

您会看到,您将有多个并行执行的mapfilter运算符的实例,但不一定同时执行。在极端情况下,工人1将在工人20开始工作之前完成阶段1(并且因此在工人20之前完成其filter操作)。

对于RDD Spark在舞台中使用iterator model。但是,对于最新的Spark版本中的数据集,它们会在分区上创建一个循环并执行转换。这意味着在这种情况下,Spark本身并不知道转换操作符何时完成单个任务!

长话短说:

  1. 你是不是能够在知道当一个阶段内的操作完成
  2. ,即使你可以有多个实例,将在不同的时间完成。

所以,现在我已经有同样的问题:

在我们Piglet project(请允许一些adverstisement ;-)),我们生成的Pig Latin脚本星火代码,并希望配置文件的脚本。我最终在所有用户操作员之间插入mapPartition运营商,该用户运营商将发送分区ID和当前时间给将评估消息的服务器。但是,这个解决方案也有其局限性......我还没有完全满意。

但是,除非你能够修改程序,恐怕你不能达到你想要的。

0

你考虑这个选项:http://spark.apache.org/docs/latest/monitoring.html
看来你可以使用下面的REST API,以获得一定的工作状态/应用/ [APP-ID] /职位/ [作业ID]

您可以设置JobGroupId和JobGroupDescription,以便跟踪正在处理的作业组。即setJobGroup

假设你会打电话给JobGroupId“测试”

sc.setJobGroup("1", "Test job") 

当你调用http://localhost:4040/api/v1/applications/[app-id]/jobs/[job-id]

你会得到一个JSON与该职位的描述性名称:

{ 
    "jobId" : 3, 
    "name" : "count at <console>:25", 
    "description" : "Test Job", 
    "submissionTime" : "2017-02-22T05:52:03.145GMT", 
    "completionTime" : "2017-02-22T05:52:13.429GMT", 
    "stageIds" : [ 3 ], 
    "jobGroup" : "1", 
    "status" : "SUCCEEDED", 
    "numTasks" : 4, 
    "numActiveTasks" : 0, 
    "numCompletedTasks" : 4, 
    "numSkippedTasks" : 0, 
    "numFailedTasks" : 0, 
    "numActiveStages" : 0, 
    "numCompletedStages" : 1, 
    "numSkippedStages" : 0, 
    "numFailedStages" : 0 
} 
+0

我没有尝试,因为在Spark Java API中有一个函数可以复制REST API的每个端点。我在Java API中尝试了所有这些功能。您能否告诉我您认为可以解决问题的REST API的哪个终点?然后,我可以使用Java API在此处发布该端点的输出。 –

+0

JSON与我得到的完全一样。但我怎么理解“jobId”:3是为“filterByAColumn”或“takingSample”这样的步骤? –

+0

为什么你不能使用jobGroup或JobDescription? – liorsolomon

相关问题