我试图在Python 2.7中使用 Queue.Queue实现多线程生产者 - 消费者模式。我试图弄清楚如何让 使用者(即工作者线程)在完成所有必需的工作后停止。如何在多线程生产者 - 使用者模式下完成工作线程后退出工作线程?
见马丁詹姆斯第二评论这个答案:https://stackoverflow.com/a/19369877/1175080
发送“我完了”的任务,指示池中的线程终止。任何得到这样的任务的线程都会重新执行它,然后自杀。
但这不适用于我。例如,请参阅以下代码。
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
该程序挂起所有三个工人已经阅读出口指示灯, 即-1
从队列后,因为每个工人重新排队-1
前 退出,所以队列永远不会成为空和q.join()
永远不会返回。
我想出了以下但丑陋的解决方案,我通过队列发送-1
出口 指标为每名工人,使每个工人可以看到它 和自杀。但是,我必须为每个工作人员发送退出指示 ,这一事实感觉有点难看。
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
我有两个问题。
- 是否可以为所有线程发送单个退出指示符的方法(如Martin James的https://stackoverflow.com/a/19369877/1175080的第二条评论所述)甚至可以工作?
- 如果上一个问题的答案是“否”,是否有办法解决问题,我不必为每个工作线程发送单独的退出指示符?
发送每个工作看起来像一个很好的解决方案为我的杀人信号,我不会说是如此丑陋。您也可以加入线程而不是加入队列 – Netwave
请注意,有一个['ThreadPool'](https://stackoverflow.com/a/3386632/3767239)类可用,它负责“手动”分配任务在多个线程之间。你可以'加入'这样的池(而不是队列),然后发送*“停止”*信号将最终终止所有线程。其实我不明白你为什么要“加入”队列而不是线程。使用Python 3,你可以通过[concurrent](https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor)模块获得更多的功能(并且更好的记录)。 –
一些额外的评论。从你的示例代码中不清楚为什么你会首先使用这样一个*“stop”*命令(你可以把这个部分放出来,等待队列加入)。然后 - 如果你使用了这样的命令 - 不能保证每个线程都会“正常”关闭:'q.join()'可能会在所有线程收到'-1'之前恢复,因为你调用了'q'。在重新将'-1'加入到队列之前task_done()'(这意味着在重新放置'-1'之前任务计数可以达到零(这增加了计数)并且因此'q.join()'可以恢复)。 –