2010-06-25 123 views
6

我在使用这段代码的死锁问题:的Python multiprocessing.Queue死锁,并得到


def _entropy_split_parallel(data_train, answers_train, weights): 
    CPUS = 1 #multiprocessing.cpu_count() 
    NUMBER_TASKS = len(data_train[0]) 
    processes = [] 

    multi_list = zip(data_train, answers_train, weights) 

    task_queue = multiprocessing.Queue() 
    done_queue = multiprocessing.Queue() 

    for feature_index in xrange(NUMBER_TASKS): 
     task_queue.put(feature_index) 

    for i in xrange(CPUS): 
     process = multiprocessing.Process(target=_worker, 
       args=(multi_list, task_queue, done_queue)) 
     processes.append(process) 
     process.start() 

    min_entropy = None 
    best_feature = None 
    best_split = None 
    for i in xrange(NUMBER_TASKS): 
     entropy, feature, split = done_queue.get() 
     if (entropy < min_entropy or min_entropy == None) and entropy != None: 
      best_feature = feature 
      best_split = split 

    for i in xrange(CPUS): 
     task_queue.put('STOP') 

    for process in processes: 
     process.join() 

    return best_feature, best_split 


def _worker(multi_list, task_queue, done_queue): 
    feature_index = task_queue.get() 
    while feature_index != 'STOP': 
     result = _entropy_split3(multi_list, feature_index) 
     done_queue.put(result) 
     feature_index = task_queue.get() 

当我运行我的程序,它通过_entropy_split_parallel工作正常多次运行,但最终死锁。父进程在done_queue.get()上被阻止,并且工作进程在done_queue.put()上被阻止。由于在发生这种情况时队列总是空的,因此预计在get上阻塞。我不明白的是为什么工人在put上阻塞,因为队列显然没有满(这是空的!)。我试过blocktimeout关键字参数,但得到相同的结果。

我正在使用多处理backport,因为我被困在Python 2.5中。


编辑:它看起来像我也越来越与多处理模块提供的示例之一的死锁问题。这是底部的第三个例子here.如果我多次调用测试方法,死锁似乎只会发生。例如,更改脚本的底部,这样的:


if __name__ == '__main__': 
    freeze_support() 
    for x in xrange(1000): 
     test() 

编辑:我知道这是一个老问题,但测试表明,这已不再是与Python 2.7 Windows中的一个问题。我会尝试Linux并回报。

回答

0

这个问题与较新版本的Python不兼容,所以我假设它是一个backport的问题。无论如何,这不再是一个问题。

4

我认为问题是父线程加入它已经通过队列的子线程。这是讨论多处理模块的programming guidelines section

无论如何,我遇到了与您描述的相同的症状,并且在重构我的逻辑以便主线程未加入子线程时,没有发生死锁。我的重构逻辑涉及知道我应该从结果或“完成”队列中获得的项目数量(可以根据子线程数量或工作队列上的项目数量等进行预测),以及循环无限地收集所有这些信息。逻辑

“玩具”插图:

num_items_expected = figure_it_out(work_queue, num_threads) 
items_received = [] 
while len(items_received) < num_items_expected: 
    items_received.append(done_queue.get()) 
    time.sleep(5) 

上述逻辑避免了父线程加入子线程的需要,但允许父线程阻塞,直到所有的孩子都做了。这种方法避免了我的死锁问题。

+0

我认为所有队列在进程加入时应该是空的,所以这应该不成问题。另外,主进程在put上死锁,而不是加入。我刚刚升级了Python(我被一个旧版本卡住了),所以我会再次测试这个。 – ajduff574 2010-08-26 06:22:33

+0

@ajduff在我的情况下,死锁没有发生在连接上,但是放置也是如此,除了放在子线程中。另外,就我而言,正在输入的队列是空的。所以我认为这也值得一试(也就是避免主线程加入子线程)。 – Jeet 2010-08-26 10:01:58