2017-08-29 48 views
0

我需要以下流程:芹菜 - 一个任务运行N个任务,等待他们和处理结果

  • ParentTask运行第一
  • 在某些时候,它产生的ChildTask N个实例,其运行在平行
  • ParentTask等待那些完成,收集结果,莫名其妙地处理它们,并完成

这似乎是很容易的。不幸的是,从任务中调用Task().delay()(我用它来调用任务)似乎完全被忽略。我完全迷失在这里。

如果你喜欢代码的方法更多,我也包括它。

from celery.task import Task 
from celery.result import AsyncResult 

class ParentTask(Task): 
    def run(self, *args, **kwargs): 
     # do some stuff 
     ids = [ChildTask().delay().id for _ in range(N)] # this seems to do nothing here 
     results = [AsyncResult(t) for t in ids] 
     while not all([r.ready() for r in results]): # wait for child tasks to finish 
      sleep(.100) 
     # do some stuff again 
     # return results 

class ChildTask(Task): 
    def run(self, *args, **kwargs): 
     # do some child stuff 
     # return child results 

ParentTask().delay() # this delay works fine 

感谢您的任何线索!

+0

您需要[Canvas](http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups) –

+0

我可以将这些组,链,和弦等组合在一起吗?如果是这样,你能否给我写一段代码片段来说明使用情况? – karlosss

回答

0

好吧,我明白了。工作方式可以像这样(当然,任务可以做任何需要的东西):

from time import sleep 
from celery.task import Task 
from celery import chain, group 

class PreTask(Task): 
    def run(self, *args, **kwargs): 
     x = 0 
     for i in range(100000): 
      x += 1 
     return x 


class MidTask(Task): 
    def run(self, *args, **kwargs): 
     sleep(5) 
     return 42 


class PostTask(Task): 
    def run(self, *args, **kwargs): 
     return args 


# call it like this 
res = chain(PreTask().s() | group(MidTask().s() for _ in range(5)) | PostTask().s()).apply_async() 

# and get the result for example like this 
while(True): 
    if res.ready(): 
     print(res.get()) 
    sleep(1) 

希望它可以帮助别人。