在Celery中,我正在运行一项主要任务,为查询中的每个项目运行一个子任务。子任务应该并行运行。在UI上,我有一个进度条,显示总共完成了多少个子任务。我正在更新主任务状态以将信息提供给进度条。我的问题是,在将所有子任务推送给经纪人后,主任务立即结束,因此我无法再更新他的状态。我希望主要任务可以等到所有子任务完成。可能吗?其他解决方案?这是我的伪代码(真实代码不使用全局;-))。在芹菜如何更新主任务的状态,直到他所有的子任务完成?
total = 0
done = 0
@task(ignore_result=True)
def copy_media(path):
global total, done
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
documents = Document.objects.all()
total = documents.count()
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
for document in documents:
process_doc.delay(document, path, copy_media)
@task(ignore_result=True)
def process_doc(document, path, copy_media):
global total, done
# Do some stuff
done += 1
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
正如文档中明确指出的那样:“让一个任务等待另一个任务的结果真的是效率低下,甚至在工作池耗尽时甚至会导致死锁 例如通过使用回调使设计异步。 “ http://celery.readthedocs.org/en/latest/userguide/tasks.html#task-synchronous-subtasks – antoinet 2015-06-25 15:09:24
我的主要任务'copy_media'没有等待另一个任务的结果。它不断更新状态以显示完成了多少子任务等。子任务并行运行,因此回调不是一个选项。最重要的是,我不能有死锁,因为'copy_media'一次只能运行一个,所以它只是阻止1个工作者。 – Etienne 2015-06-25 18:03:44
这是有效地等待其他任务的结果。你正在调用results.ready(),并且有一个任务正在测试其他人。如果你的工作人员疲惫不堪,你就会陷入僵局,因为没有任何子任务会被执行,你的主要任务将永远不会结束。 – rsalmei 2016-10-03 17:31:41