2016-11-22 103 views
1

我们目前有一个Python Apache Beam管道工作并能够在本地运行。我们现在正在使管道运行在Google Cloud Dataflow上,并且完全自动化,但是Dataflow/Apache Beam的流水线监控存在限制。Python Apache Beam Pipeline状态API调用

目前,Cloud Dataflow有两种方法可以通过它们的UI界面或命令行中的gcloud来监控您的管道状态。这两种解决方案都不适用于完全自动化的解决方案,我们可以考虑无损文件处理。

看着阿帕奇Beam的github上,他们有一个文件,internal/apiclient.py,显示有用于找工作的状态的功能,get_job

我们发现get_job的一个实例是runners/dataflow_runner.py

最终目标是使用此API来获取我们自动触发运行的一个或多个作业的状态,以确保它们最终都通过管道成功处理。

任何人都可以向我们解释在运行我们的管道(p.run())之后如何使用此API?我们不明白response = runner.dataflow_client.get_job(job_id)runner来自哪里。

如果有人可以提供更大的理解我们如何在设置/运行我们的管道时访问此API调用,那将是非常好的!

回答

1

我最终只是摆弄了代码,发现了如何获得工作细节。我们的下一步是看看是否有办法获得所有工作的清单。

# start the pipeline process 
pipeline     = p.run() 
# get the job_id for the current pipeline and store it somewhere 
job_id     = pipeline.job_id() 
# setup a job_version variable (either batch or streaming) 
job_version    = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION 
# setup "runner" which is just a dictionary, I call it local 
local     = {} 
# create a dataflow_client 
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version) 
# get the job details from the dataflow_client 
print local['dataflow_client'].get_job(job_id)