2017-01-23 78 views
1

我正在写一些luigi工作流程,并且我正在尝试调试这些任务。为了做到这一点,我需要重复使用相同的参数反复执行这些任务,直到最终他们做我想做的事情。luigi:为调试目的重新运行任务?

据我所知,路易吉的任务是幂和给予相同的输入,年初的时候,因此通常不会重新运行,而这正是需要工作流程调试并投入生产。但是,在开发过程中,使用完全相同的输入和输出重新运行工作流程非常有用 - 而且我认为这是必要的。

我知道我可以重写complete()方法在开发过程中在每个任务中返回False。但是,这会使任务处于未完成状态。

我正在寻找一种方法来设置我的工作流以某种“开发”或“调试”模式运行,因此我可以反复运行并重新运行它以完成,即使所有任务运行正常,直到我确定工作流程正在按照我的需要进行。

有没有办法在luigi做到这一点?

预先感谢您。

================后来添加================

每低于我的意见,看来将输入参数更改为任务不会导致重新运行。只有当其output()方法返回唯一值时,该任务才能重新运行。这似乎违背了“幂等”的定义,因为改变输入参数应该把一个真正的幂等任务当作一个新的独特的实体,而不管它是否发生了与另一个具有不同输入参数的调用相同的输出。

以下代码说明了此问题。 “x”参数确定output()方法返回的文件名,而“y”参数用于输出内容中,但不用于输出文件的名称。

如果我用“--x 10 --y 20”然后“--x 10 --y 30”调用工作流,则第二次调用不会导致任何一个任务重新运行。我相信这是不正确的行为。但是,如果我用“--x 10 --y 20”和“ - x 11 --y 20”调用工作流程,那么这两个任务确实会重新运行。

#!/usr/bin/python3                            
# -*- python -*-                             

import luigi 

class Child(luigi.Task): 

    x = luigi.Parameter() 
    y = luigi.Parameter() 

    def requires(self): 
     return [] 

    def output(self): 
     return luigi.LocalTarget("child_{}.txt".format(self.x)) 

    def run(self): 
     with self.output().open('w') as f: 
      f.write('{} {}\n'.format(self.x, self.y)) 

class Parent(luigi.Task): 

    x = luigi.Parameter() 
    y = luigi.Parameter() 

    def requires(self): 
     return [ Child(self.x, self.y) ] 

    def output(self): 
     return luigi.LocalTarget("parent_{}.txt".format(self.x)) 

    def run(self): 
     with self.input()[0].open() as fin, self.output().open('w') as fout: 
      for line in fin: 
       fout.write("from command line: --x {} --y {}, from child: {}\n".format(self.x, self.y, line.strip())) 

if __name__ == '__main__': 
    luigi.run() 

回答

0

正如你所说,调试模式将会很好。但我认为路易吉没有这样的事情。

你可以做的一个技巧是在任务调用complete()方法之前移除目标,如here所示。您的任务必须是此类的子类,因此您可以在执行之前使用--force参数进行重置。

请注意,此解决方案仅适用于您的任务具有本地文件作为输出。您必须定制它才能删除S3密钥/存储桶,数据库表或行等。

+1

非常感谢。我的应用程序正在创建S3存储桶和Redshift表,而且我确实必须删除这些项才能重新运行我的任务。不幸的是,这在某些情况下是无用的。然而,我想我可能会遇到一个解决方案:如果每次运行调试测试到我的工作流并将此参数作为参数传递给每个任务时,如果我生成一个唯一参数,那么我确实可以重新运行所有任务工作流程。我把我的任务结构化为每个将DictParameter作为参数,并在调试运行期间传入此唯一ID。它适合工作。 – HippoMan

+0

明天早上我会对此进行更全面的测试,并在此处汇报我的发现。 – HippoMan

+0

嗯,我错了。我错误地认为,通过改变任务的输入(即通过添加一个唯一的输入参数),Luigi总是会重新执行任务。但是,显然情况并非如此。在任务重新运行之前,输出必须是唯一的,而不管该任务的输入参数如何。这似乎违背了“幂等”的严格定义......但我想我们必须忍受这一点。好吧。 – HippoMan

相关问题