13

我试图在Python 2.7中使用 Queue.Queue实现多线程生产者 - 消费者模式。我试图弄清楚如何让 使用者(即工作者线程)在完成所有必需的工作后停止。如何在多线程生产者 - 使用者模式下完成工作线程后退出工作线程?

见马丁詹姆斯第二评论这个答案:https://stackoverflow.com/a/19369877/1175080

发送“我完了”的任务,指示池中的线程终止。任何得到这样的任务的线程都会重新执行它,然后自杀。

但这不适用于我。例如,请参阅以下代码。

import Queue 
import threading 
import time 

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     data = q.get() 
     print 'worker', n, 'got', data 
     time.sleep(1) # Simulate noticeable data processing time 
     q.task_done() 
     if data == -1: # -1 is used to indicate that the worker should stop 
      # Requeue the exit indicator. 
      q.put(-1) 
      # Commit suicide. 
      print 'worker', n, 'is exiting' 
      break 

def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Create 3 workers. 
    for i in range(3): 
     t = threading.Thread(target=worker, args=(i, q)) 
     t.start() 

    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 
     time.sleep(0.5) 

    # Send an exit indicator for all threads to consume. 
    q.put(-1) 

    print 'waiting for workers to finish ...' 
    q.join() 
    print 'done' 

master() 

该程序挂起所有三个工人已经阅读出口指示灯, 即-1从队列后,因为每个工人重新排队-1前 退出,所以队列永远不会成为空和q.join()永远不会返回。

我想出了以下但丑陋的解决方案,我通过队列发送-1出口 指标为每名工人,使每个工人可以看到它 和自杀。但是,我必须为每个工作人员发送退出指示 ,这一事实感觉有点难看。

import Queue 
import threading 
import time 

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     data = q.get() 
     print 'worker', n, 'got', data 
     time.sleep(1) # Simulate noticeable data processing time 
     q.task_done() 
     if data == -1: # -1 is used to indicate that the worker should stop 
      print 'worker', n, 'is exiting' 
      break 

def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Create 3 workers. 
    for i in range(3): 
     t = threading.Thread(target=worker, args=(i, q)) 
     t.start() 

    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 
     time.sleep(0.5) 

    # Send one stop indicator for each worker. 
    for i in range(3): 
     q.put(-1) 

    print 'waiting for workers to finish ...' 
    q.join() 
    print 'done' 

master() 

我有两个问题。

  1. 是否可以为所有线程发送单个退出指示符的方法(如Martin James的https://stackoverflow.com/a/19369877/1175080的第二条评论所述)甚至可以工作?
  2. 如果上一个问题的答案是“否”,是否有办法解决问题,我不必为每个工作线程发送单独的退出指示符?
+0

发送每个工作看起来像一个很好的解决方案为我的杀人信号,我不会说是如此丑陋。您也可以加入线程而不是加入队列 – Netwave

+2

请注意,有一个['ThreadPool'](https://stackoverflow.com/a/3386632/3767239)类可用,它负责“手动”分配任务在多个线程之间。你可以'加入'这样的池(而不是队列),然后发送*“停止”*信号将最终终止所有线程。其实我不明白你为什么要“加入”队列而不是线程。使用Python 3,你可以通过[concurrent](https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor)模块获得更多的功能(并且更好的记录)。 –

+0

一些额外的评论。从你的示例代码中不清楚为什么你会首先使用这样一个*“stop”*命令(你可以把这个部分放出来,等待队列加入)。然后 - 如果你使用了这样的命令 - 不能保证每个线程都会“正常”关闭:'q.join()'可能会在所有线程收到'-1'之前恢复,因为你调用了'q'。在重新将'-1'加入到队列之前task_done()'(这意味着在重新放置'-1'之前任务计数可以达到零(这增加了计数)并且因此'q.join()'可以恢复)。 –

回答

11

不要将它称为任务的特例。

改为使用Event,为您的员工实施非阻塞实施。

stopping = threading.Event() 

def worker(n, q, timeout=1): 
    # run until the master thread indicates we're done 
    while not stopping.is_set(): 
     try: 
      # don't block indefinitely so we can return to the top 
      # of the loop and check the stopping event 
      data = q.get(True, timeout) 
     # raised by q.get if we reach the timeout on an empty queue 
     except queue.Empty: 
      continue 
     q.task_done() 

def master(): 
    ... 

    print 'waiting for workers to finish' 
    q.join() 
    stopping.set() 
    print 'done' 
+1

我相信你的意思是'q.get(True,timeout)',即你的意思是'q.get()'的'block'参数是'True'而不是'False'。如果将它设置为'False',那么'q.get()'根本不会被阻止,并立即返回,这将导致'while'循环非常快地旋转。如果您将其设置为“True”,则在解锁前阻止“超时”秒。 –

+0

你说得对,我知道了。固定。 :) –

11

可以发送一个出口指标的所有线程(如马丁詹姆斯https://stackoverflow.com/a/19369877/1175080第二注释解释)的方法,甚至工作?

正如你已经注意到它不能工作,传播消息将使最后一个线程更新队列与一个更多的项目,因为你正在等待一个永远不会是空的队列,而不是你的代码有。

如果上一个问题的答案是“否”,是否有办法解决问题的方式,我不必为每个工作线程发送单独的退出指示符?

可以join线程,而不是队列:

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     data = q.get() 
     print 'worker', n, 'got', data 
     time.sleep(1) # Simulate noticeable data processing time 
     q.task_done() 
     if data == -1: # -1 is used to indicate that the worker should stop 
      # Requeue the exit indicator. 
      q.put(-1) 
      # Commit suicide. 
      print 'worker', n, 'is exiting' 
      break 

def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Create 3 workers. 
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)] 
    for t in threads: 
     threads.start() 
    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 
     time.sleep(0.5) 

    # Send an exit indicator for all threads to consume. 
    q.put(-1) 

    print 'waiting for workers to finish ...' 
    for t in threads: 
     t.join() 
    print 'done' 

master() 

由于Queue documentation解释get方法将上升,一旦其空的execption所以如果你已经知道的数据处理您可以填写队列和那么垃圾邮件的主题:

import Queue 
import threading 
import time 

def worker(n, q): 
    # n - Worker ID 
    # q - Queue from which to receive data 
    while True: 
     try: 
      data = q.get(block=False, timeout=1) 
      print 'worker', n, 'got', data 
      time.sleep(1) # Simulate noticeable data processing time 
      q.task_done() 
     except Queue.Empty: 
      break 


def master(): 
    # master() sends data to worker() via q. 
    q = Queue.Queue() 

    # Send 10 items to work on. 
    for i in range(10): 
     q.put(i) 

    # Create 3 workers. 
    for i in range(3): 
     t = threading.Thread(target=worker, args=(i, q)) 
     t.start() 

    print 'waiting for workers to finish ...' 
    q.join() 
    print 'done' 

master() 

这里有一个live example

+0

你能回答我的第一个问题吗? –

+0

@LoneLearner,是的,给我一个:) – Netwave

+0

@DanielSanchez你的回答很好,但你的第二个例子改变了这个问题。 OP首先实例化他的工作线程,然后填充队列,同时交换这些任务。如果您没有交换这些任务,那么如果在添加工作负载之前看到空队列,则可能会有一些工作线程提早退出(由于超时)。删除附加线程通信信号的方法是适当的。就个人而言,我认为有线程块的工作和主线程控制退出是大多数应用程序的最干净的方法。 – tdube

3

除了@DanielSanchez出色答卷,我建议实际上 依靠类似的机制为Java CountDownLatch

要点之中,

  • 创建latch只开放以后一定计数器下去
  • 当锁被打开时,该线程(或多个)等待它将被允许继续执行。

  • 我做了一个过于简单的例子,检查here一类像例子这样的锁存:

    import threading 
    import Queue 
    import time 
    
    WORKER_COUNT = 3 
    latch = threading.Condition() 
    count = 3 
    
    def wait(): 
        latch.acquire() 
        while count > 0: 
         latch.wait() 
        latch.release() 
    
    def count_down(): 
        global count 
        latch.acquire() 
        count -= 1 
        if count <= 0: 
         latch.notify_all() 
        latch.release() 
    
    def worker(n, q): 
        # n - Worker ID 
        # q - Queue from which to receive data 
        while True: 
         data = q.get() 
         print 'worker', n, 'got', data 
         time.sleep(1) # Simulate noticeable data processing time 
         q.task_done() 
         if data == -1: # -1 is used to indicate that the worker should stop 
          # Requeue the exit indicator. 
          q.put(-1) 
          # Commit suicide. 
          count_down() 
          print 'worker', n, 'is exiting' 
          break 
    
    # master() sends data to worker() via q. 
    
    def master(): 
        q = Queue.Queue() 
    
        # Create 3 workers. 
        for i in range(WORKER_COUNT): 
         t = threading.Thread(target=worker, args=(i, q)) 
         t.start() 
    
        # Send 10 items to work on. 
        for i in range(10): 
         q.put(i) 
         time.sleep(0.5) 
    
        # Send an exit indicator for all threads to consume. 
        q.put(-1) 
        wait() 
        print 'done' 
    
    master() 
    
4

只是为了完整性缘故 你也可以排队停止信号是 - (线程数)。 每个线程则可以通过一个递增并重新排队它只有在停止信号!= 0

if data < 0: # negative numbers are used to indicate that the worker should stop 
     if data < -1: 
      q.put(data + 1) 
     # Commit suicide. 
     print 'worker', n, 'is exiting' 
     break 

但我会亲自去与Travis Mehlinger Daniel Sanchez答案。