我有一个mongoDB集合中有几十万个文档可以访问,它遵循名为MyDoc的MongoEngine Schema。有一些东西(让我们称之为my_operation)在每个文档上运行。另外my_operation要求(只是读取,不改变)通过create_data_dict函数构造的OrderedDict,称为data_dict。我希望能够通过芹菜工人并行运行my_operation。如何使用在多台机器上运行的芹菜工作器并行处理一组对象?
设置包括django,mongo,mongoengine和芹菜。
选项1:
@celery.task()
def my_operation(my_doc_list):
data_dict = create_data_dict()
for doc in my_doc_list:
do_something_to_doc(data_dict, doc)
doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
batch_size = len(MyDoc.objects)/no_of_celery_workers
for b in xrange(0, len(MyDoc.objects), batch_size):
my_operation.delay(MyDoc.objects[b:b+batch_size])
选项2:my_operation需要data_dict和MyDoc实例
@celery.task()
def my_operation(data_dict, my_doc):
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
data_dict = create_data_dict()
celery.group([my_operation.s(data_dict, my_doc) for my_doc in MyDoc.objects]).apply_async()
选项3:
@celery.task()
def my_operation(my_doc):
data_dict = create_data_dict()
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()
选项4:
@celery.task()
def my_operation(my_doc):
data_dict = get_data_dict()
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
def get_data_dict():
data_dict = cache.get("data_dict")
if data_dict is None:
data_dict = create_data_dict()
cache.set("data_dict", data_dict)
return data_dict
#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()
如果有Option1的工作,我可能不会问这个问题,但唉,我不能传递切片的查询集响应,或查询自己芹菜工人,因为他们不pickleable。这就是回溯似乎主要说的。
随着Option2我会最终传递data_dict与每个任务,并以某种方式听起来不太吸引人。如果我在多台机器上运行芹菜工作器(我打算这么做),data_dict本质上只需要传递一次就会浪费大量的网络资源。
而在Option3的情况下,为每个文档重新创建data_dict,现在这似乎是对处理能力的巨大浪费。选项4:我使用缓存来备份data_dict,而不是重新计算它,或者用每个文档重新传输它。这听起来像是最好的想法,但有一个问题。下次我想在所有MyDoc上执行my_operation时,我希望重新计算data_dict,而不管它是否存在于缓存中。有没有办法实现这一点?
问题: 这样做的最佳方法是什么?