2014-10-06 75 views
9

我有几个基本的问题,当涉及到使用Python的multiprocessing模块:我是否需要显式地将multiprocessing.Queue实例变量传递给子进程在实例方法上执行?

class Someparallelworkerclass(object) : 

    def __init__(self): 
     self.num_workers = 4 
     self.work_queue = multiprocessing.JoinableQueue() 
     self.result_queue = multiprocessing.JoinableQueue() 

    def someparallellazymethod(self): 
     p = multiprocessing.Process(target=self.worktobedone).start() 

    def worktobedone(self): 
     # get data from work_queue 
     # put back result in result queue 

是否有必要通过work_queueresult_queueargsProcess?答案取决于操作系统吗?更基本的问题是:子进程是否从父进程获得复制(COW)地址空间,并且因此知道类/类方法的定义?如果是,它如何知道这些队列将被IPC共享,并且它不应该在子进程中复制work_queueresult_queue?我试着在网上搜索,但是我发现的大部分文档都是模糊的,并没有详细描述底下究竟发生了什么。

回答

-1

子进程没有得到复制的地址空间。这个孩子是一个完全独立的python进程,没有任何共享。是的,你必须把队列传给孩子。当你这样做时,多处理自动处理通过IPC的共享。见https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

+0

这并不完全正确。在Linux上,子进程是从父进程派生的,所以它实际上从父进程获得了写时复制地址空间。实际上,您可以''将'放入'子中的队列,并从父节点'获取'结果,而不必在Linux *和* Windows上将'Queue'明确地传递给子节点。它似乎没有工作的唯一情况是使用“spawn”或“forkserver”上下文的Linux 3.4+。 – dano 2014-10-07 15:25:49

+0

其实,我收回最后一句话。这是由我的错误造成的。无论上下文/平台如何,您都可以隐式传递Queue对象。 – dano 2014-10-07 16:44:09

+0

+ dano,很好的答案。你显然是一位多处理的专家! – Matt 2014-10-08 19:59:22

7

在这种情况下,实际上不需要在args参数中包含队列,无论您使用的是哪种平台。原因是,即使它看起来并不像你明确地将两个JoinableQueue实例传递给孩子,但实际上是 - 通过self。因为self明确地被传递给孩子,并且两个队列是self的一部分,他们最终被传递给孩子。

在Linux中,这通过0​​发生,这意味着由该Queue内部使用用于进程间通信的multiprocessing.connection.Connection对象使用文件描述符继承由子(未复制)。 Queue的其他部分变成copy-on-write,但没关系; multiprocessing.Queue被设计为使得需要被复制的片段实际上不需要在两个过程之间保持同步。事实上,许多内部属性得到复位发生后fork

def _after_fork(self): 
    debug('Queue._after_fork()') 
    self._notempty = threading.Condition(threading.Lock()) 
    self._buffer = collections.deque() 
    self._thread = None 
    self._jointhread = None 
    self._joincancelled = False 
    self._closed = False 
    self._close = None 
    self._send = self._writer.send # _writer is a 
    self._recv = self._reader.recv 
    self._poll = self._reader.poll 

所以,涵盖Linux操作系统。 Windows如何? Windows没有fork,所以它需要腌制self将它发送给孩子,这包括酸洗我们的Queues。现在,一般来说,如果你尝试腌制一个multiprocessing.Queue,它失败:

>>> import multiprocessing 
>>> q = multiprocessing.Queue() 
>>> import pickle 
>>> pickle.dumps(q) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex 
    dict = getstate() 
    File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__ 
    assert_spawning(self) 
    File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning 
    ' through inheritance' % type(self).__name__ 
RuntimeError: Queue objects should only be shared between processes through inheritance 

但其实这是一种人为的限制。 multiprocessing.Queue对象可以在某些情况下被腌渍 - 如何将它们发送到Windows中的子进程?事实上,我们可以看到,如果我们看看实现:

def __getstate__(self): 
    assert_spawning(self) 
    return (self._maxsize, self._reader, self._writer, 
      self._rlock, self._wlock, self._sem, self._opid) 

def __setstate__(self, state): 
    (self._maxsize, self._reader, self._writer, 
    self._rlock, self._wlock, self._sem, self._opid) = state 
    self._after_fork() 

__getstate__,酸洗一个实例时被调用,有一个assert_spawning呼叫它,这可以确保我们实际上正在产卵的过程,而试图腌菜*。 __setstate__,在取出时被调用,负责调用_after_fork

那么当我们必须保持酸菜时,队列所使用的Connection对象如何维护?原来有一个multiprocessing子模块完全符合 - multiprocessing.reduction。在模块顶部的评论指出它非常清楚:

# 
# Module to allow connection and socket objects to be transferred 
# between processes 
# 

在Windows上,该模块最终使用Windows提供的API DuplicateHandle创建重复柄子进程Connection对象可以使用。因此,虽然每个进程都有自己的句柄,但它们是完全重复的 - 任何一个动作都会反映在另一个上:

重复句柄引用与原始句柄相同的对象。 因此,对象的任何更改都通过 句柄反映出来。例如,如果复制文件句柄,则两个句柄的当前文件位置 始终相同。

*有关更多信息,请参见this answerassert_spawning

1

子进程并没有在其关闭的队列。这是队列参考不同区域内存的实例。使用队列时,您希望的方式必须将它们作为参数传递给函数。我喜欢的一种解决方案是使用functools.partial来将你的函数与你想要的队列一起嵌套,将它们永久地添加到它的闭包中,并让你启动多个线程来执行同一个IPC通道的相同任务。

相关问题