2012-04-12 68 views
31

我正在尝试使用多进程池对象。我希望每个进程在启动时都打开数据库连接,然后使用该连接来处理传入的数据(而不是打开和关闭每个数据位的连接)。这似乎是初始化程序的作用因为,但我无法围绕工作人员和初始化程序如何沟通。所以我有这样的事情:如何使用初始化设置我的多进程池?

def get_cursor(): 
    return psycopg2.connect(...).cursor() 

def process_data(data): 
    # here I'd like to have the cursor so that I can do things with the data 

if __name__ == "__main__": 
    pool = Pool(initializer=get_cursor, initargs=()) 
    pool.map(process_data, get_some_data_iterator()) 

我怎么(或我)从get_cursor()返回光标到process_data()?

回答

63

初始化功能是因此被称为:

def worker(...): 
    ... 
    if initializer is not None: 
     initializer(*args) 

所以保存任何地方没有返回值。你可能认为这会让你失望,但不是!每个工人都在一个独立的过程中。因此,您可以使用普通的global变量。

这不完全是漂亮,但它的工作原理:

cursor = None 
def set_global_cursor(...): 
    global cursor 
    cursor = ... 

现在你可以在你的process_data功能只需使用cursor。每个独立进程中的变量cursor与所有其他进程是分开的,因此它们不会互相踩在一起。

(我不知道psycopg2是否有不同的方式来解决这个问题不涉及在第一时间使用multiprocessing;这意味着作为一个笼统的回答与multiprocessing模块的一个普遍问题)

+8

这应该是接受的答案 – thias 2014-01-15 14:12:32

+0

@torek应该在init_worker中调用set_global_cursor吗? – 2015-06-13 07:06:51

+0

@TheUnfunCat:不知道'init_worker'是什么(我在你的答案中看到一个,但在原始问题中没有)我不能确定地说。总的想法是允许'多进程。Pool“创建一个进程池,并让每个进程创建(它自己的私有副本)数据库连接。如果你想在池进程启动时发生这种情况,你可以使用初始化函数。如果您希望稍后发生,可以稍后再做。无论哪种方式,你需要一个持久变量,就像你的方法中的'function.cursor'或普通的'global'一样。 – torek 2015-06-14 00:40:39

4

您也可以将函数一起发送到初始化程序并在其中创建一个连接。之后,将光标添加到该功能。

def init_worker(function): 
    function.cursor = db.conn() 

现在,您可以通过function.cursor访问数据库而无需使用全局变量。

+1

你的进程命令是这样的: p = Pool(initializer = init_worker,args =(func)); p.map(func,args_set); ?? – 2015-07-31 13:32:14

+0

是的,类似的东西(我记得这个工作,但一段时间没有在相关的东西工作,所以不记得确切的细节,随时dv或修改我的答案) – 2015-07-31 16:49:12

7

torek已经给出了一个很好的解释,为什么初始化器在这种情况下不起作用。然而,我个人并不喜欢全局变量,所以我想在这里粘贴另一个解决方案。

想法是使用一个类来包装函数并用“全局”变量初始化类。

class Processor(object): 
    """Process the data and save it to database.""" 

    def __init__(self, credentials): 
    """Initialize the class with 'global' variables""" 
    self.cursor = psycopg2.connect(credentials).cursor() 

    def __call__(self, data): 
    """Do something with the cursor and data""" 
    self.cursor.find(data.key) 

然后用

p = Pool(5) 
p.map(Processor(credentials), list_of_data) 

调用,这样第一个参数初始化类证书,返回类的实例,并调用地图数据的实例。

尽管这不像全局变量解决方案那么直截了当,但我强烈建议避免全局变量并以某种安全方式封装变量。 (我真的希望他们可以在一天内支持lambda表达式,这会让事情变得简单...)

+0

我喜欢这个答案,因为它很漂亮,但它不会重新连接列表中的每个项目? – woot 2016-05-27 06:42:07

+6

它通常很好地避免了全局变量,你可以做这样的事情,但是你会推迟初始化'self.cursor',直到'p.map'实际上已经启动了流程实例。也就是说,你的'__init__'只会设置为'None','__call__'会说'如果self.cursor是None:self.cursor = ...'。最后,我们真正需要的是一个单进程单例。 – torek 2016-07-23 08:13:02

+0

您也可以尝试使用带有线程/进程id的映射作为关键字,以便完全隔离每个线程/进程的连接。 – 2017-03-07 16:35:37

相关问题