2011-10-31 69 views
1

问题是消费者永远不会退出,它只是无所事事。该代码旨在以这样的方式工作:蟒蛇多处理自给式消费锁永远

创建队列并将某个task_data传递给它。指定数量的消费者被创建来处理数据。当消费者发现队列为空时,由于其他消费者仍有可能将某些东西放入队列中,所以它不能离开,但它可以在consumers_finished列表中指示它没有作业。消费者循环继续,直到每个工人都表示他们完成了工作。目前还不知道会有多少工作,因为消费者会将任务放入队列中。我读了一些关于这方面的内容,但不清楚如果进程自己养活,进程是否会永远等待。

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, results, consumers_finished): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.results = results 
     self.consumers_finished = consumers_finished 

    def run(self): 
     while not all(flag for flag in self.consumers_finished.values()): 
      task_data = self.task_queue.get() 
      if not task_data: 
       self.consumers_finished[self.name] = True 
       continue 

      self.consumers_finished[self.name] = False 
      task_result = self.do_some_processing(task_data) 
      self.task_queue.put(task_result) 


class Starter(object): 

    def start(self): 
     manager = multiprocessing.Manager() 
     task_queue = multiprocessing.Queue() 
     results = manager.list() 
     consumers_finished = manager.dict() 

     consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)] 

     for consumer in consumers: 
      consumers_finished[consumer.name] = False 
      consumer.start() 

     task_queue.put(task_data) 

     for consumer in consumers: consumer.join() 

     return results 
+0

相关:http://stackoverflow.com/questions/19071529/python-multiprocessing-125-list-never-finishes –

回答

2

看起来像一个良好的睡眠确实有帮助,刷新的头脑可以做更多的事情.. 无论如何,我找到了解决学习Python文档后。

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, results, consumers_finished): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.results = results 
     self.consumers_finished = consumers_finished 

    def run(self): 
     while not all(flag for flag in self.consumers_finished.values()): 
      try: 
       task = self.todo_queue.get(False) 
       self.consumers_finished[self.name] = False 
      except QueueEmpty: 
       self.consumers_stopped[self.name] = True 
       continue 

      task_result = self.do_some_processing(task_data) 
      self.task_queue.put(task_result) 


class Starter(object): 

    def start(self): 
     manager = multiprocessing.Manager() 
     task_queue = manager.Queue() 
     results = manager.list() 
     consumers_finished = manager.dict() 

     consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)] 

     for consumer in consumers: 
      consumers_finished[consumer.name] = False 
      consumer.start() 

     task_queue.put(task_data) 

     for consumer in consumers: consumer.join() 

     return results 

这是从Python文档,这也解释了我的问题的一部分,我认为:

警告正如上面提到的,如果一个子进程已经把项目上 队列(现在也没有使用JoinableQueue.cancel_join_thread()),然后 该进程将不会终止,直到所有缓冲项已被清空到管道 。这意味着如果您尝试加入该进程 ,则可能会发生死锁,除非您确定所有已装入队列的项目都已被占用。同样,如果 子进程是非守护进程,那么当 尝试加入其所有非守护进程子进程时,父进程可能会在退出时挂起。 请注意,使用管理器创建的队列 不存在此问题。请参阅编程 准则。

所以我只是改变了队列,它现在由管理器创建,并且在消费者的run方法中,任务以不同的方式从队列中取出,请参阅代码。