我想了解Python并发性。作为一个实验,我有以下程序使用进程池并通过apply_async()调用工作程序。为了在进程(工作和结果)之间共享信息,我使用了multiprocessing.Manager()中的一个队列。Python并发性:使用apply_async()时挂起
但是,此代码挂起 - 有时 - 当工作队列中的所有项目都已处理完毕,并且我不确定原因。我必须运行程序几次来观察行为。
作为一个附注,我可以使这个工作正确:我发现了一些设计模式,人们有时称为“毒丸”的方法,它似乎工作。 (在我的worker()方法中,当我的工作队列包含一个sentinel值时,我进入一个无限循环并跳出循环。我在工作队列上创建尽可能多的sentinel值,就像我正在运行进程一样)。
但我仍然有兴趣找出为什么这段代码挂起。 我得到如下的输出(进程ID是在括号中):
Found 8 CPUs.
Operation queue has 20 items.
Will start 2 processes.
Joining pool...
[5885] entering worker() with work_queue size of 20
[5885] processed work item 0
[5885] worker() still running because work_queue has size 19
[5885] processed work item 1
[5885] worker() still running because work_queue has size 18
[5885] processed work item 2
[5885] worker() still running because work_queue has size 17
[5885] processed work item 3
[5885] worker() still running because work_queue has size 16
[5885] processed work item 4
[5885] worker() still running because work_queue has size 15
[5885] processed work item 5
[5886] entering worker() with work_queue size of 14
[5885] worker() still running because work_queue has size 14
[5886] processed work item 6
[5886] worker() still running because work_queue has size 13
[5885] processed work item 7
[5886] processed work item 8
[5885] worker() still running because work_queue has size 11
[5886] worker() still running because work_queue has size 11
[5885] processed work item 9
[5886] processed work item 10
[5885] worker() still running because work_queue has size 9
[5886] worker() still running because work_queue has size 9
[5885] processed work item 11
[5886] processed work item 12
[5885] worker() still running because work_queue has size 7
[5886] worker() still running because work_queue has size 7
[5885] processed work item 13
[5886] processed work item 14
[5885] worker() still running because work_queue has size 5
[5886] worker() still running because work_queue has size 5
[5885] processed work item 15
[5886] processed work item 16
[5885] worker() still running because work_queue has size 3
[5886] worker() still running because work_queue has size 3
[5885] processed work item 17
[5886] processed work item 18
[5885] worker() still running because work_queue has size 1
[5886] worker() still running because work_queue has size 1
[5885] processed work item 19
[5885] worker() still running because work_queue has size 0
[5885] worker() is finished; returning results of size 20
(。程序挂起的最后一行的其他工艺 - 5886 - 似乎并没有完成)
import multiprocessing
from multiprocessing import Pool
import os
# Python 2.7.6 on Linux
# Worker (consumer) process
def worker(work_queue, results_queue):
print "[%d] entering worker() with work_queue size of %d" % (os.getpid(), work_queue.qsize())
while not work_queue.empty():
item = work_queue.get()
print "[%d] processed work item %s" % (os.getpid(), item)
s = '[%d] has processed %s.' % (os.getpid(), item)
results_queue.put(s)
work_queue.task_done()
print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize())
print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize())
if __name__ == '__main__':
MAX_PROCESS = 2 # Max number of processes to create
MAX_ITEMS = 20 # Max work items to process
m = multiprocessing.Manager()
results_queue = m.Queue()
work_queue = m.Queue()
# Assign work
for x in xrange(MAX_ITEMS):
work_queue.put(x)
print "Found %d CPUs." % multiprocessing.cpu_count()
print "Operation queue has %d items." % work_queue.qsize()
print "Will start %d processes." % MAX_PROCESS
# Pool method
pool = Pool(processes=MAX_PROCESS)
for n in range(0, MAX_PROCESS):
pool.apply_async(worker, args=(work_queue, results_queue))
pool.close()
print "Joining pool..."
pool.join()
print "Joining pool finished..."
print "--- After pool completion ---"
print "Work queue has %d items." % work_queue.qsize()
print "Results queue has %d items." % results_queue.qsize()
print "Results are:"
while not results_queue.empty():
item = results_queue.get()
print str(item)
results_queue.task_done()
print "--End---"
感谢您的帮助。