2016-09-10 24 views
3

给出以下Python代码:Python的多处理,游泳池地图 - 取消所有正在运行的进程,如果一个,返回所需的结果

import multiprocessing 

def unique(somelist): 
    return len(set(somelist)) == len(somelist) 


if __name__ == '__main__': 
    somelist = [[1,2,3,4,5,6,7,8,9,10,11,12,13,2], [1,2,3,4,5], [1,2,3,4,5,6,7,8,9,1], [0,1,5,1]] 

    pool = multiprocessing.Pool() 
    reslist = pool.map(unique, somelist) 
    pool.close() 
    pool.join() 
    print "Done!" 

    print reslist 

现在想象一下,与在这个玩具例如整数列表非常长,我想在这里实现以下内容:如果某个列表中的某个列表返回True,则会终止所有正在运行的进程。

这导致了两个问题(甚至更多,我还没有想出):

  • 我怎么能“读”从完成的加工结果/“听”,而其他进程正在跑步?如果例如一个进程正在处理来自somelist的[1,2,3,4,5],并且在所有其他进程之前完成,我如何才能在此刻读取该进程的结果?

  • 鉴于在其他运行时可以“读出”完成进程的结果的情况:如何将此结果用作终止所有其他正在运行的进程的条件?

例如,如果一个进程已经完成并且返回True,我该如何使用它作为终止所有其他(仍然)正在运行的进程的条件?

预先感谢您的任何提示 丹

回答

4

使用pool.imap_unordered,以查看他们拿出任何命令的结果。

reslist = pool.imap_unordered(unique, somelist) 
pool.close() 
for res in reslist: 
    if res: # or set other condition here 
     pool.terminate() 
     break 
pool.join() 

您可以遍历主进程中的imap reslist,但池进程仍在生成结果。

+0

也打破循环,否则可能会被卡住在游泳池终止后等待下一个结果。 –

+0

很好,谢谢:) –

1

没有花哨的IPC(进程间通信)技巧,最简单的方法是使用带回调函数的Pool方法。该回调在主程序中运行(在由multiprocessing创建的线程中),并在每个结果可用时使用。当回调看到你喜欢的结果时,它可以终止Pool。例如,

import multiprocessing as mp 

def worker(i): 
    from time import sleep 
    sleep(i) 
    return i, (i == 5) 

def callback(t): 
    i, quit = t 
    result[i] = quit 
    if quit: 
     pool.terminate() 

if __name__ == "__main__": 
    N = 50 
    pool = mp.Pool() 
    result = [None] * N 
    for i in range(N): 
     pool.apply_async(func=worker, args=(i,), callback=callback) 
    pool.close() 
    pool.join() 
    print(result) 

这几乎肯定会显示以下(OS调度变幻莫测可以允许其他输入或两个被消耗):

[False, False, False, False, False, True, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None] 
相关问题