2017-02-28 63 views
2

我一直在尝试将Luigi作为我们的工作流程处理程序。目前我们正在使用concourse,但是我们试图做的很多事情都是在大厅中散步,所以我们转而使用Luigi作为依赖管理器。到目前为止没有问题,工作流程触发并正确执行。Python中的事件处理Luigi

当任务因任何原因失败时出现问题。这种情况特别是需要任务块,但所有情况都需要照顾。截至目前,Luigi优雅地处理了错误并将其写入STDOUT。它仍然会发出并退出代码0,这意味着作业已通过。假阳性。

我一直试图让事件处理来解决这个问题,但我不能让它触发,即使有一个非常简单的工作:

@luigi.Task.event_handler(luigi.Event.FAILURE) 
def mourn_failure(task, exception): 
    with open('/root/luigi', 'a') as f: 
     f.write("we got the exception!") #testing in concourse image 
    sys.exit(luigi.retcodes.retcode().unhandled_exception) 

class Test(luigi.Task): 
    def requires(self): 
     raise Exception() 
     return [] 

    def run(self): 
     pass 

    def output(self): 
     return [] 

然后在Python shell中运行命令

luigi.run(main_task_cls=Test, local_scheduler=True)

异常升起,但偶不发生或什么。 该文件不被写入和退出代码仍为0

而且,如果它使在其中包含

[retcode] 
already_running=10 
missing_data=20 
not_run=25 
task_failed=30 
scheduling_error=35 
unhandled_exception=40 

/etc/luigi/client.cfg的差异,我有我路易吉配置我不知道为什么事件处理程序不会触发,但不知何故,我需要该过程在发生错误时失败。

+0

嗯我不太确定如何使用luigi,但基本上,当测试失败时,您将需要一个非零的退出代码... 大厅事件流程非常简单。 1.运行您的task.yml中指定的脚本 2.根据脚本的退出代码转为绿色或红色 –

回答

0

看起来问题在于你放置“引发异常”调用。 如果将它放在require函数中 - 它基本上会在测试任务运行方法之前运行。所以它不像你的测试任务失败,而是它依赖的任务(现在,空白......)。

例如,如果您将加注运行,您的代码将按照您的预期运行。

def run(self): 
     print('start') 
     raise Exception() 

到了那里你所依赖的失败处理的情况下(在这种情况下,引发异常的需要方法),您可以添加其他类型的路易吉事件处理程序,BROKEN_TASK的:luigi.Event.BROKEN_TASK。 这将确保luigi代码发出您期望的返回码(不同于0)。

干杯!

0

如果我的理解正确,你只是想让luigi在任务失败时返回一个错误代码,我对这个有很多问题,但事实证明这很简单,你只需要用luigi就可以运行它命令行,而不是python。就像这样:

luigi --module my_module MyTask

我不知道,如果这是你的问题太多,但我使用python运行,然后路易吉忽略对luigi.cfg的retcodes。希望能帮助到你。