2012-04-06 54 views
3

在Celery中,我正在运行一项主要任务,为查询中的每个项目运行一个子任务。子任务应该并行运行。在UI上,我有一个进度条,显示总共完成了多少个子任务。我正在更新主任务状态以将信息提供给进度条。我的问题是,在将所有子任务推送给经纪人后,主任务立即结束,因此我无法再更新他的状态。我希望主要任务可以等到所有子任务完成。可能吗?其他解决方案?这是我的伪代码(真实代码不使用全局;-))。在芹菜如何更新主任务的状态,直到他所有的子任务完成?

total = 0 
done = 0 

@task(ignore_result=True) 
def copy_media(path): 
    global total, done 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    documents = Document.objects.all() 
    total = documents.count() 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    for document in documents: 
     process_doc.delay(document, path, copy_media) 

@task(ignore_result=True) 
def process_doc(document, path, copy_media): 
    global total, done 
    # Do some stuff 
    done += 1 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 

回答

0

我找到了一种方法,使用TaskSet。但我并不完全满意,因为我不能忽视子任务的结果。如果我不理会导致对process_doc任务results.ready()总是返回Falseresults.completed_count()总是返回0,等下面的代码:

@task(ignore_result=True) 
def copy_media(path): 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    documents = Document.objects.all() 
    total = documents.count() 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    job = TaskSet(tasks=[process_doc.subtask((document, path)) 
         for document in documents]) 
    results = job.apply_async() 
    doc_name = '' 
    while not results.ready(): 
     done = results.completed_count() 
     if done: 
      last = done - 1 
      for idx in xrange(last, -1, -1): 
       if results[idx].ready(): 
        doc_name = results[idx].result 
        break 
     copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name}) 
     time.sleep(0.25) 

@task() 
def process_doc(document, path): 
    # Do some stuff 
    return document 
+1

正如文档中明确指出的那样:“让一个任务等待另一个任务的结果真的是效率低下,甚至在工作池耗尽时甚至会导致死锁 例如通过使用回调使设计异步。 “ http://celery.readthedocs.org/en/latest/userguide/tasks.html#task-synchronous-subtasks – antoinet 2015-06-25 15:09:24

+0

我的主要任务'copy_media'没有等待另一个任务的结果。它不断更新状态以显示完成了多少子任务等。子任务并行运行,因此回调不是一个选项。最重要的是,我不能有死锁,因为'copy_media'一次只能运行一个,所以它只是阻止1个工作者。 – Etienne 2015-06-25 18:03:44

+0

这是有效地等待其他任务的结果。你正在调用results.ready(),并且有一个任务正在测试其他人。如果你的工作人员疲惫不堪,你就会陷入僵局,因为没有任何子任务会被执行,你的主要任务将永远不会结束。 – rsalmei 2016-10-03 17:31:41

0

可以使用memcached的支持缓存来的完成任务门店数量。在django cache API中甚至有cache.inrc用于原子增量,以确保count的并发更新不会造成干扰。

此外,主要任务运行,直到所有子任务完成是一个坏主意,因为你基本上阻塞了芹菜工作很长一段时间。如果芹菜运行一个工作进程,这将导致永无止境的锁定。

+0

关于您使用Django缓存存储计数的建议,我发现奇怪的是必须重新实现Celery中已有的内容,即。一个国家体系。而且我的需求比较复杂,只保留点数。正如你在我的回答中看到的那样,我还传递了文档名称(以及更多内容)。由于我的主要任务是阻止一名芹菜工人,所以我可以看到这个问题,但对我来说绝对不是问题。我为这个主要任务提供了一个专用的Celery守护进程,并且与许多工作人员一起执行了子任务,并且阻止了主要任务同时运行。 – Etienne 2012-04-07 15:54:07

0

我不知道你正在运行哪个版本的芹菜,但你可以看看Group子任务(3.0版本的新功能)。

相关问题