2017-03-08 74 views
0

我想了解Python的多处理,但我目前正在与以下挣扎:的利用多重队列和工作池,让工人延长队列

工人池我想给料从对象开始了发电机功能放入队列中,然后由工作人员使用。这工作正常,但是我现在想扩展我的程序以允许工作人员将工作添加到队列中。然而,这是我遇到问题的部分,因为我在第一个循环中添加的工作紧接着在第二个循环中添加了停止代码(请参见示例代码)。意味着任何工人添加的任何额外工作将永远不会执行......

我认为唯一需要的是检查是否两个队列都是空的,并且没有一个工人正在做任何事情,然后继续进行最后的循环,以停止工人。但我不知道如何检查工人的状态来做到这一点。

最少的代码来显示的例子:使用一个计数信号值,我设法实现我想要的东西

import multiprocessing, time, random 

def f(queue): 
    worker_name = multiprocessing.current_process().name 
    print "Started: {}".format(worker_name) 

    while True: 
     value = queue.get() 
     if value is None: 
      break 

     print "{} is processing '{}'".format(worker_name, value) 
     # compute(value) 
     time.sleep(1) 

     # Worker may add additional work to queue 
     if random.random() > 0.7: 
      queue.put("Extra work!") 

    print "Stopping: {}".format(worker_name) 


n_workers = 4 
queue = multiprocessing.Queue() 
pool = multiprocessing.Pool(n_workers, f, (queue,)) 

# Feed large objects from generator 
for i in xrange(20): 
    queue.put(i) 

# All extra work is skipped 

# Terminate workers after finishing work 
for __ in xrange(n_workers): 
    queue.put(None) 

pool.close() 
pool.join() 

print "Finished!" 
print queue.get() # Will yield 'Extra Work!' should be empty 

回答

0

。我通过递增/递减此值来跟踪每个工人的活动,并尽快停止工人:队列为空工人不再处理任何东西。

任何意见表示赞赏。

示例代码:

import multiprocessing, time, random 

def f(queue, semaphore): 
    worker_name = multiprocessing.current_process().name 
    print "Started: {}".format(worker_name) 

    while True: 
     value = queue.get() 
     if value is None: 
      break 

     with semaphore.get_lock(): 
      semaphore.value -= 1 

     print "{} is processing '{}'".format(worker_name, value) 
     # compute(value) 
     time.sleep(1) 

     # Worker may add additional work to queue 
     if random.random() > 0.7: 
      queue.put("Extra work!") 

     with semaphore.get_lock(): 
      semaphore.value += 1 

    print "Stopping: {}".format(worker_name) 


n_workers = 4 
semaphore = multiprocessing.Value('i', n_workers) 
queue = multiprocessing.Queue() 
pool = multiprocessing.Pool(n_workers, f, (queue, semaphore)) 

# Feed large objects from generator 
for i in xrange(20): 
    queue.put(i) 

while not queue.empty() or semaphore.value != n_workers: 
    time.sleep(0.2) 

# Terminate workers after finishing work 
for __ in xrange(n_workers): 
    queue.put(None) 

pool.close() 
pool.join() 

print "Finished!" 
print queue.empty() # True