2013-02-25 61 views
1

我有一个mongoDB集合中有几十万个文档可以访问,它遵循名为MyDoc的MongoEngine Schema。有一些东西(让我们称之为my_operation)在每个文档上运行。另外my_operation要求(只是读取,不改变)通过create_data_dict函数构造的OrderedDict,称为data_dict。我希望能够通过芹菜工人并行运行my_operation。如何使用在多台机器上运行的芹菜工作器并行处理一组对象?

设置包括django,mongo,mongoengine和芹菜。

选项1:

@celery.task() 
def my_operation(my_doc_list): 
    data_dict = create_data_dict() 
    for doc in my_doc_list: 
     do_something_to_doc(data_dict, doc) 
     doc.save() 

def create_data_dict(): 
    #blah blah blah 
    return data_dict 

#So I run everything like this: 
batch_size = len(MyDoc.objects)/no_of_celery_workers 
for b in xrange(0, len(MyDoc.objects), batch_size): 
    my_operation.delay(MyDoc.objects[b:b+batch_size]) 

选项2:my_operation需要data_dict和MyDoc实例

@celery.task() 
def my_operation(data_dict, my_doc): 
    do_something_to_doc(data_dict, my_doc) 
    my_doc.save() 

def create_data_dict(): 
    #blah blah blah 
    return data_dict 

#So I run everything like this: 
data_dict = create_data_dict() 
celery.group([my_operation.s(data_dict, my_doc) for my_doc in MyDoc.objects]).apply_async() 

选项3:

@celery.task() 
def my_operation(my_doc): 
    data_dict = create_data_dict() 
    do_something_to_doc(data_dict, my_doc) 
    my_doc.save() 

def create_data_dict(): 
    #blah blah blah 
    return data_dict 

#So I run everything like this: 
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async() 

选项4:

@celery.task() 
def my_operation(my_doc): 
    data_dict = get_data_dict() 
    do_something_to_doc(data_dict, my_doc) 
    my_doc.save() 

def create_data_dict(): 
    #blah blah blah 
    return data_dict 

def get_data_dict(): 
    data_dict = cache.get("data_dict") 
    if data_dict is None: 
     data_dict = create_data_dict() 
     cache.set("data_dict", data_dict) 
    return data_dict 

#So I run everything like this: 
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async() 

如果有Option1的工作,我可能不会问这个问题,但唉,我不能传递切片的查询集响应,或查询自己芹菜工人,因为他们不pickleable。这就是回溯似乎主要说的。

随着Option2我会最终传递data_dict与每个任务,并以某种方式听起来不太吸引人。如果我在多台机器上运行芹菜工作器(我打算这么做),data_dict本质上只需要传递一次就会浪费大量的网络资源。

而在Option3的情况下,为每个文档重新创建data_dict,现在这似乎是对处理能力的巨大浪费。选项4:我使用缓存来备份data_dict,而不是重新计算它,或者用每个文档重新传输它。这听起来像是最好的想法,但有一个问题。下次我想在所有MyDoc上执行my_operation时,我希望重新计算data_dict,而不管它是否存在于缓存中。有没有办法实现这一点?

问题: 这样做的最佳方法是什么?

回答

0

表面选项2听起来就像选项1 - 当您传递对象或其数据时。

这里有很多未知数,但考虑到你在选项4中提到可能会有一些缓存争用/竞争条件,我可能会选择每次生成数据并传递对象ID,或者如果这样做是过分的昂贵的我会实现一个集合来存储缓存的数据,然后清理缓存(findAndModify以停止竞争条件)作为任务的一部分。

相关问题