2015-10-13 67 views
10

说我有两个芹菜任务:如何芹菜的Task.map处理错误

@celery.task 
def run_flakey_things(*args, **kwargs): 
    return run_flakey_and_synchronous_thing.map(
     xrange(10) 
    ).apply_async() 


@celery.task 
def run_flakey_and_synchronous_thing(a): 
    if a % 5: 
     return a 
    raise RuntimeError(a) 

所以,当你去跑run_flakey_things因为序列中的第一项引发一个异常,它会翻倒直线距离。我想按照地图的顺序为的顺序为所有项目运行任务,但继续在异常上运行,并在所有这些完成后引发新的异常。

理想的做法是,如果我能在应用之前的ON_FAILURE添加到xmap对象,但xmap没有出现是一个完整的任务目标。

回答

0

您可以更改您的返回值以指示并传播错误。像这样:

import traceback 

@celery.task 
def run_flakey_things(*args, **kwargs): 
    return run_flakey_and_synchronous_thing.map(
     xrange(10) 
    ).apply_async() 


@celery.task 
def run_flakey_and_synchronous_thing(a): 
    d = {'value': None, 'error': None} 
    try: 
     if a % 5: 
      d['value'] = a 
    except: 
     d['error'] = traceback.format_exc() 
    return d 

然后,您可以做几件事情:

1)改变你的run_flakey_things有和没有错误组的事情,没有错误返回的人,并报告有错误的人。

2)处理任何调用的行为run_flakey_things

0

我没有用过芹菜。但是,您可以添加一个__call__方法到子任务吗?喜欢的东西:

class MyTask: 
    def __call__(self, *args, **kwargs): 
     try: 
      super(self, MyTask).__call__(*args, **kwargs) 
     except Exception, exc: 
      # store exc to be raised later in the main task 

@celery.task(base=MyTask): 
def run_flakey_and_synchronous_thing(a): 
    # ... 

然后主run_flakey_things任务,或许覆盖apply_async()检索存储的例外,因为在How to override __call__ in celery on main?