2012-07-16 75 views
13

我有一个芹菜链运行一些任务。每个任务都可能失败并被重试。请参阅下面一个简单的例子:重试芹菜失败的任务是链条的一部分

from celery import task 

@task(ignore_result=True) 
def add(x, y, fail=True): 
    try: 
     if fail: 
      raise Exception('Ugly exception.') 
     print '%d + %d = %d' % (x, y, x+y) 
    except Exception as e: 
     raise add.retry(args=(x, y, False), exc=e, countdown=10) 

@task(ignore_result=True) 
def mul(x, y): 
    print '%d * %d = %d' % (x, y, x*y) 

和链条:

from celery.canvas import chain 
chain(add.si(1, 2), mul.si(3, 4)).apply_async() 

运行两个任务(并假设没有失败),你会得到/看印刷:

1 + 2 = 3 
3 * 4 = 12 

但是,当添加任务第一次失败并在后续的重试调用中成功时,链中的其余任务不会运行,即添加任务失败,链中的所有其他任务都不会运行,并且在af几秒后,添加任务再次运行并成功,链中的其余任务(本例中为mul.si(3,4))不会运行。

芹菜是否提供了一种方法来继续从失败的任务中继续失败链?如果不是,那么完成此操作的最佳方法是什么,并确保链的任务按照指定的顺序运行,并且只有在前一个任务成功执行后才会执行,即使任务重试了几次也是如此。

注1:问题可以通过做

add.delay(1, 2).get() 
mul.delay(3, 4).get() 

来解决,但我想了解为何链不失败的任务。

回答

0

我也有兴趣了解为什么连锁不能在失败的任务中工作。

我挖一些芹菜代码,以及到目前为止,我发现是:

实施happends在app.builtins.py

@shared_task 
def add_chain_task(app): 
    from celery.canvas import chord, group, maybe_subtask 
    _app = app 

    class Chain(app.Task): 
     app = _app 
     name = 'celery.chain' 
     accept_magic_kwargs = False 

     def prepare_steps(self, args, tasks): 
      steps = deque(tasks) 
      next_step = prev_task = prev_res = None 
      tasks, results = [], [] 
      i = 0 
      while steps: 
       # First task get partial args from chain. 
       task = maybe_subtask(steps.popleft()) 
       task = task.clone() if i else task.clone(args) 
       i += 1 
       tid = task.options.get('task_id') 
       if tid is None: 
        tid = task.options['task_id'] = uuid() 
       res = task.type.AsyncResult(tid) 

       # automatically upgrade group(..) | s to chord(group, s) 
       if isinstance(task, group): 
        try: 
         next_step = steps.popleft() 
        except IndexError: 
         next_step = None 
       if next_step is not None: 
        task = chord(task, body=next_step, task_id=tid) 
       if prev_task: 
        # link previous task to this task. 
        prev_task.link(task) 
        # set the results parent attribute. 
        res.parent = prev_res 

       results.append(res) 
       tasks.append(task) 
       prev_task, prev_res = task, res 

      return tasks, results 

     def apply_async(self, args=(), kwargs={}, group_id=None, chord=None, 
       task_id=None, **options): 
      if self.app.conf.CELERY_ALWAYS_EAGER: 
       return self.apply(args, kwargs, **options) 
      options.pop('publisher', None) 
      tasks, results = self.prepare_steps(args, kwargs['tasks']) 
      result = results[-1] 
      if group_id: 
       tasks[-1].set(group_id=group_id) 
      if chord: 
       tasks[-1].set(chord=chord) 
      if task_id: 
       tasks[-1].set(task_id=task_id) 
       result = tasks[-1].type.AsyncResult(task_id) 
      tasks[0].apply_async() 
      return result 

     def apply(self, args=(), kwargs={}, **options): 
      tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']] 
      res = prev = None 
      for task in tasks: 
       res = task.apply((prev.get(),) if prev else()) 
       res.parent, prev = prev, res 
      return res 
    return Chain 

你可以看到,在年底prepare_stepsprev_task链接到下一个任务。 当prev_task失败时,未调用下一个任务。

我与添加从上一个任务link_error到下一个测试:

if prev_task: 
    # link and link_error previous task to this task. 
    prev_task.link(task) 
    prev_task.link_error(task) 
    # set the results parent attribute. 
    res.parent = prev_res 

但随后,接下来的任务,必须照顾这两种情况下(也许当它配置为不可变的,除了如不接受更多的论据)。

我觉得链可以通过允许一些语法的支持,喜欢这样的:

c = chain(t1, (t2, t1e), (t3, t2e))

这意味着:

t1linkt2link_errort1e

t2linkt3link_errort2e

+0

我决定使用运行,否则将在链中的所有任务的连锁任务,而是等待一个任务开始在另一个之前完成,如:'task1.delay([PARAMS])。得到(); 。task2.delay([PARAMS])得到(); task3.delay([PARAMS])。得到()'。链式任务可以捕获任何任务引发的异常并重试自身。 – Andrei 2012-07-25 09:45:13

+0

因此,从你的例子中,t1e和t2e必须分别调用t2和t3,对吧? – Andrei 2012-07-25 09:51:17

+0

这个例子只是我对可能的链式语法的思考。这意味着每个接下来的任务,现在确实是对的任务,如果在上一步中没有异常发生/错误对中第一个元素将被调用,第二个元素是上一步骤的失败异常/错误处理程序。 't1e'的意思是't1错误处理程序' – anh 2012-07-25 10:01:54