2014-09-26 100 views
1

我有一堆可以同时执行的任务,但是一旦一切就绪,我想执行最后一个任务。我使用下面的代码:芹菜和弦在所有任务完成之前执行

chunk_tasks = [] 
for index, chunk in enumerate(chunks): 
    chunk_tasks.append(import_chunk.s(meta.pk)) 

g = group(chunk_tasks) 
chord(g)(import_completed.s(meta.pk, max_lines=max_lines)) 

但是它看起来像完成所有任务之前import_completed执行。 import_chunk任务看起来像:

@task(bind=True, ignore_result=IGNORE_RESULTS) 
def import_chunk(self, meta_pk): 
    try: 
     # do some stuff 
    except Exception, e: 
     if self.max_retries == self.request.retries: 
      logger.exception('Unexpected error in import_chunk') 
     raise self.retry(countdown=60, max_retries=3) 

所以问题是我在做什么错了?

回答

0

和弦是只有在组中的所有任务都执行完毕后才执行的任务。所以,它的需要在其头部的任务状态进行同步。

但是,当您将ignore_result设置为task时,工作人员将不存储任务状态并返回此任务的值。

这将导致根据您的工作流程重试任务或抛出异常或任何故障。

所以,chord(add.s(i, i) for i in range(10))(tsum.s()).get()是完全有效的,并给出结果的情形1,但它给出了CASE 2.一些麻烦

案例1:

@app.task 
def add(x, y): 
    return x + y 

@app.task 
def tsum(numbers): 
    return sum(numbers) 

案例2:

@app.task(ignore_result=True) 
def add(x, y): 
    return x + y 

@app.task(ignore_result=True) 
def tsum(numbers): 
    return sum(numbers) 

所以,你必须改变ignore_result或改变窝你的任务的流程。

从文档:

您应该避免使用和弦尽可能。尽管如此,和弦仍然是工具箱中强大的基础,因为同步是许多并行算法的必需步骤。

+0

是的,我已经偶然发现了关于ignore_result的问题,它被设置为False。 – 2014-09-27 08:13:33

相关问题