2014-08-28 1193 views
0

我想了解Python并发性。作为一个实验,我有以下程序使用进程池并通过apply_async()调用工作程序。为了在进程(工作和结果)之间共享信息,我使用了multiprocessing.Manager()中的一个队列。Python并发性:使用apply_async()时挂起

但是,此代码挂起 - 有时 - 当工作队列中的所有项目都已处理完毕,并且我不确定原因。我必须运行程序几次来观察行为。

作为一个附注,我可以使这个工作正确:我发现了一些设计模式,人们有时称为“毒丸”的方法,它似乎工作。 (在我的worker()方法中,当我的工作队列包含一个sentinel值时,我进入一个无限循环并跳出循环。我在工作队列上创建尽可能多的sentinel值,就像我正在运行进程一样)。

但我仍然有兴趣找出为什么这段代码挂起。 我得到如下的输出(进程ID是在括号中):

Found 8 CPUs. 
Operation queue has 20 items. 
Will start 2 processes. 
Joining pool... 
[5885] entering worker() with work_queue size of 20 
[5885] processed work item 0 
[5885] worker() still running because work_queue has size 19 
[5885] processed work item 1 
[5885] worker() still running because work_queue has size 18 
[5885] processed work item 2 
[5885] worker() still running because work_queue has size 17 
[5885] processed work item 3 
[5885] worker() still running because work_queue has size 16 
[5885] processed work item 4 
[5885] worker() still running because work_queue has size 15 
[5885] processed work item 5 
[5886] entering worker() with work_queue size of 14 
[5885] worker() still running because work_queue has size 14 
[5886] processed work item 6 
[5886] worker() still running because work_queue has size 13 
[5885] processed work item 7 
[5886] processed work item 8 
[5885] worker() still running because work_queue has size 11 
[5886] worker() still running because work_queue has size 11 
[5885] processed work item 9 
[5886] processed work item 10 
[5885] worker() still running because work_queue has size 9 
[5886] worker() still running because work_queue has size 9 
[5885] processed work item 11 
[5886] processed work item 12 
[5885] worker() still running because work_queue has size 7 
[5886] worker() still running because work_queue has size 7 
[5885] processed work item 13 
[5886] processed work item 14 
[5885] worker() still running because work_queue has size 5 
[5886] worker() still running because work_queue has size 5 
[5885] processed work item 15 
[5886] processed work item 16 
[5885] worker() still running because work_queue has size 3 
[5886] worker() still running because work_queue has size 3 
[5885] processed work item 17 
[5886] processed work item 18 
[5885] worker() still running because work_queue has size 1 
[5886] worker() still running because work_queue has size 1 
[5885] processed work item 19 
[5885] worker() still running because work_queue has size 0 
[5885] worker() is finished; returning results of size 20 

(。程序挂起的最后一行的其他工艺 - 5886 - 似乎并没有完成)

import multiprocessing 
from multiprocessing import Pool 
import os 

# Python 2.7.6 on Linux 

# Worker (consumer) process 
def worker(work_queue, results_queue): 
    print "[%d] entering worker() with work_queue size of %d" % (os.getpid(), work_queue.qsize()) 
    while not work_queue.empty(): 
     item = work_queue.get() 
     print "[%d] processed work item %s" % (os.getpid(), item) 
     s = '[%d] has processed %s.' % (os.getpid(), item) 
     results_queue.put(s) 
     work_queue.task_done() 
     print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize()) 
    print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize()) 

if __name__ == '__main__': 

    MAX_PROCESS = 2  # Max number of processes to create 
    MAX_ITEMS = 20  # Max work items to process 

    m = multiprocessing.Manager() 
    results_queue = m.Queue() 
    work_queue = m.Queue() 

    # Assign work 
    for x in xrange(MAX_ITEMS): 
     work_queue.put(x)  

    print "Found %d CPUs." % multiprocessing.cpu_count() 
    print "Operation queue has %d items." % work_queue.qsize() 
    print "Will start %d processes." % MAX_PROCESS 

    # Pool method 
    pool = Pool(processes=MAX_PROCESS) 
    for n in range(0, MAX_PROCESS): 
     pool.apply_async(worker, args=(work_queue, results_queue)) 
    pool.close() 
    print "Joining pool..." 
    pool.join() 
    print "Joining pool finished..." 
    print "--- After pool completion ---" 

    print "Work queue has %d items." % work_queue.qsize() 
    print "Results queue has %d items." % results_queue.qsize() 
    print "Results are:" 

    while not results_queue.empty(): 
     item = results_queue.get() 
     print str(item) 
     results_queue.task_done() 
    print "--End---" 

感谢您的帮助。

回答

1

你打一个竞争条件 - 工艺5886看到了Pool中有一个项目:

[5886] worker() still running because work_queue has size 1 

因此,循环回绕到阻塞get电话:

while not work_queue.empty(): # It sees it's not emtpy here 
    item = work_queue.get() # But there's no item left by the time it gets here! 

然而,在它调用work_queue.empty()之后,之前它调用work_queue.get(),另一个工作者(5885)消耗了最后一项队列:

[5885] processed work item 19 
[5885] worker() still running because work_queue has size 0 
[5885] worker() is finished; returning results of size 20 

所以现在5886将永远阻止就get。一般来说,如果存在多个队列消费者,则不应使用empty()方法来决定是否阻止get()调用,因为它容易受到这种竞争条件的影响。使用“毒丸”你提到/定点方法是正确的方式来处理这种情况,或者使用非阻塞get通话,并捕捉Empty例外,它应该会出现:

try: 
    item = work_queue.get_nowait() 
    print "[%d] processed work item %s" % (os.getpid(), item) 
    s = '[%d] has processed %s.' % (os.getpid(), item) 
    results_queue.put(s) 
    work_queue.task_done() 
    print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize()) 
except Queue.Empty: 
    print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize()) 

请注意,您只能如果你知道,一旦员工开始消费,队列的大小将不会增长。否则,当有更多项目添加到队列中时,您可以决定工人应该退出。