2017-08-04 64 views
2

在我开始提问之前,让我提一下我已经知道下面的多处理代码被破坏了。其中有TOCTOU错误。以下代码旨在为我提供教学目的,以便我可以更多地了解代码是如何被破坏的。所以我的问题是关于破碎代码的一个特定方面。首先,让我展示我的代码。为什么仅在访问多处理特定场景中的共享列表时发生“Broken pipe”错误?

现在,您可以完全忽略worker_b,因为我们现在不在任何地方使用它。我们稍后再回来。

import Queue 
import multiprocessing 
import time 

lock = multiprocessing.Lock() 

def pprint(s): 
    lock.acquire() 
    print(s) 
    lock.release() 

def worker_a(i, stack): 
    if stack: 
     data = stack.pop() 
     pprint('worker %d got %d' % (i, data)) 
     time.sleep(2) 
     pprint('worker %d exiting ...' % i) 
    else: 
     pprint('worker %d has nothing to do!' % i) 

def worker_b(i, stack): 
    if stack: 
     data = stack.pop() 
     pprint('worker %d got %d (stack length: %d)' % (i, data, len(stack))) 
     time.sleep(2) 
     pprint('worker %d exiting ... (stack length: %d)' % (i, len(stack))) 
    else: 
     pprint('worker %d has nothing to do!' % i) 

manager = multiprocessing.Manager() 
stack = manager.list() 

def master(): 
    for i in range(5): 
     stack.append(i) 
     pprint('master put %d' % i) 

    i = 0 
    while stack: 
     t = multiprocessing.Process(target=worker_a, args=(i, stack)) 
     t.start() 
     time.sleep(1) 
     i += 1 

    pprint('master returning ...') 

master() 

pprint('master returned!') 

以上破碎的代码似乎工作正常。

$ python mplifo.py 
master put 0 
master put 1 
master put 2 
master put 3 
master put 4 
worker 0 got 4 
worker 1 got 3 
worker 0 exiting ... 
worker 2 got 2 
worker 1 exiting ... 
worker 3 got 1 
worker 2 exiting ... 
worker 4 got 0 
worker 3 exiting ... 
master returning ... 
master returned! 
worker 4 exiting ... 

然而,如果我请worker_b代替worker_a,即改变

 t = multiprocessing.Process(target=worker_a, args=(i, stack)) 

 t = multiprocessing.Process(target=worker_b, args=(i, stack)) 

出现以下错误。

$ python mplifo.py 
master put 0 
master put 1 
master put 2 
master put 3 
master put 4 
worker 0 got 4 (stack length: 4) 
worker 1 got 3 (stack length: 3) 
worker 0 exiting ... (stack length: 3) 
worker 2 got 2 (stack length: 2) 
worker 1 exiting ... (stack length: 2) 
worker 3 got 1 (stack length: 1) 
worker 2 exiting ... (stack length: 1) 
worker 4 got 0 (stack length: 0) 
worker 3 exiting ... (stack length: 0) 
master returning ... 
master returned! 
Process Process-6: 
Traceback (most recent call last): 
    File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run 
    self._target(*self._args, **self._kwargs) 
    File "mplifo.py", line 27, in worker_b 
    pprint('worker %d exiting ... (stack length: %d)' % (i, len(stack))) 
    File "<string>", line 2, in __len__ 
    File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod 
    conn.send((self._id, methodname, args, kwds)) 
IOError: [Errno 32] Broken pipe 
  • 为什么只出现在worker_b情况下,这个错误?
  • 为什么这个错误只发生在第二个pprint()呼叫worker_b而不是第一个pprint()呼叫?

回答

0

追溯的这一部分给你一个提示:

File "mplifo.py", line 27, in worker_b 
    pprint('worker %d exiting ... (stack length: %d)' % (i, len(stack))) 
    File "<string>", line 2, in __len__ 
    File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod 
    conn.send((self._id, methodname, args, kwds)) 

在工作进程,stack不是一个Python列表。它是由multiprocessing.Manager创建的代理,它包含驻留在主进程中的列表。当最后一个worker_b退出时,它会评估代理必须从主进程请求的len(stack)。但到那时,主人已经退出了 - 通往它的通讯管道被打破了。

这不会在worker_a中发生,因为它在退出前不会尝试评估len(stack)

相关问题