2017-07-18 66 views
2

我实际上很难相信我遇到了问题,我有,它似乎是一个在Python多处理模块中的大错误。无论如何,我遇到的问题是,只要我将multiprocessing.Queue传递给multiprocessing.Pool worker作为参数,池工作人员就不会执行其代码。即使是在python docs中找到的示例代码的稍微修改版本的非常简单的测试中,我也能够重现此错误。multiprocessing.Queue作为参数池的工人中止工作人员的执行

下面是示例代码队列的原始版本:

from multiprocessing import Process, Queue 

def f(q): 
    q.put([42, None, 'hello']) 


if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    print(q.get()) # prints "[42, None, 'hello']" 
    p.join() 

这里是我的队列的示例代码修改后的版本:

from multiprocessing import Queue, Pool 

def f(q): 
    q.put([42, None, 'hello']) 

if __name__ == '__main__': 
    q = Queue() 
    p = Pool(1) 
    p.apply_async(f,args=(q,)) 
    print(q.get()) # prints "[42, None, 'hello']" 
    p.close() 
    p.join() 

所有我做的是使PA大小为1的进程池,而不是多进程。进程对象,结果是代码永远挂在打印语句上,因为没有任何内容写入队列!当然,我测试它的原始形式,它工作正常。我的操作系统是Windows 10,我的Python版本是3.5.x,任何人都有任何想法,为什么发生这种情况?

更新:仍然不知道为什么这个示例代码与multiprocessing.process,而不是一个multiprocessing.Pool,但我发现work around我满足(亚历克斯Martelli的答案)。显然你可以创建一个多处理的全局列表。并且要传递每个进程和索引来使用,我将避免使用一个受管队列,因为它们比较慢。感谢Guest向我展示链接。

+1

你可能想看看[#1](https://stackoverflow.com/a/30039159/3767239),[# 2](https://stackoverflow.com/q/3217002/3767239),[#3](https://stackoverflow.com/q/9908781/3767239),[#4](https://stackoverflow.com/a/42659752/3767239),[#5](https://stackoverflow.com/a/25558333/3767239)。看起来原因是'Queue'实例不能被腌制。但是我不明白为什么这会使流程的底层队列陷入僵局。使用池大小为2的'Ctrl + C'显示它被卡在'task = inqueue.get()'它将请求目标函数的地方。这有点令人费解。 –

+0

请注意,对于异步编程,您不需要手动处理结果队列 - apply_async会返回['AsyncResult'](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool .AsyncResult)可以用来得到结果的实例:'result.get()'。这使用了一个基础结果(out-)队列,所以你只需要在你的目标函数中“返回”。同样,如果你使用'result.get()',并且你将'Queue'实例作为参数传递给目标函数,它将引发'RuntimeError'。不过,我很好奇为什么这不会发生在你的例子。 –

+0

看到我的评论给你的答案。我的目标不是“结果队列”,这仅仅是一个简单的例子。我需要一个连续写入和处理的队列。 – profPlum

回答

0

问题

当你调用apply_async它返回一个AsyncResult对象和叶工作量分配到一个单独的线程(见this answer)。此线程遇到Queue对象不能为pickled的问题,因此所请求的工作不能分发(并最终执行)。我们可以通过调用AsyncResult.get看到这一点:

r = p.apply_async(f,args=(q,)) 
r.get() 

这引起了RuntimeError

RuntimeError: Queue objects should only be shared between processes through inheritance 

然而,这RuntimeError是在主线程中只是提出一旦申请结果,因为它实际上是发生在不同的线程(并因此需要传输的方式)。

当你做

p.apply_async(f,args=(q,)) 

是目标函数f永远不会被调用,因为它的一个参数(q)不能腌制会发生什么。因此q永远不会收到一个项目并保持为空,因此在主线程中调用q.get将永远阻止。

解决方案

随着apply_async您不必手动管理的结果队列,但他们很容易在AsyncResult对象的形式提供给您。所以,你可以修改代码来简单地从目标函数返回:

from multiprocessing import Queue, Pool 

def f(): 
    return [42, None, 'hello'] 

if __name__ == '__main__': 
    q = Queue() 
    p = Pool(1) 
    result = p.apply_async(f) 
    print(result.get()) 
+0

有趣但我不明白代码是如何工作的,当你只使用multiprocessing.Process而不是multiprocessing.Pool时,他们都创建新的流程,所以不需要为两种方法对Queue进行pickle?另外使用AsyncResult解决方法对我来说不是真正可行的,因为我需要一堆工作进程持续写入队列,然后由另一个工作进程读取并处理该队列。 – profPlum

+0

@profPlum对于这些问题,我想参考[这个答案](https://stackoverflow.com/a/45184127/3767239)。其实质是'Pool'在收到'Queue'实例之后立即启动进程,而'Process'开始启动(此处不需要酸洗,进程尚未运行)。您可以使用['Manager'](https://docs.python.org/3/library/multiprocessing.html#managers)在进程之间共享对象,或者使用[''initializer''和'initargs'关键字参数[ Pool'](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool)。 –

+0

啊确定不确定为什么这个过程已经运行的事实需要酸洗,但有关于初始化和初始化的解释和信息我会接受你的回答 – profPlum