3
我有一份我希望分发给池中工作人员的任务列表。我想达到两个目的:多处理:池:等待所有结果,但立即处理单个结果
- 当工人完成,过程,结果立即
- 有一个简单的方法来等待所有的工人来完成。
使用fapply_async,我可以轻松实现第一个目标。只要工作人员完成,回调就会被调用。但是,为了实现第二个目标,我能想出的唯一解决方案基本上只是轮询AsyncResults,直到它们都准备好()。
使用map_async,我可以轻松实现第二个目标。但是,只有当所有工作人员完成后,回调才会被调用一次。我相信我理解这个的原因(结果的顺序是相关的)。
是否有一些解决方案我错过了将实现目标1和2?
这里是我的测试代码:
#!/usr/bin/python3
import multiprocessing
import time
import random
def worker(src):
time.sleep(0.2)
# src is apply_async or map_async
return (src, random.randint(1, 100))
def map_async_example():
tasks = ['map_async'] * 20
with multiprocessing.Pool(processes=4) as pool:
r = pool.map_async(worker, tasks, callback=print)
r.wait()
def fapply_async_example():
tasks = [('fapply_async',)] * 20
with multiprocessing.Pool(processes=4) as pool:
ars = []
for t in tasks:
ar = pool.apply_async(worker, t, callback=print)
ars.append(ar)
# Wait for all AsyncResults to become ready()
while len(ars) > 0:
time.sleep(0.5)
# Keep only the not-ready results
ars = [ar for ar in ars if not ar.ready()]
def main():
# One list of 20 results
print('===============')
print('Using map_async')
print('===============')
map_async_example()
# 20 results
print('==================')
print('Using fapply_async')
print('==================')
fapply_async_example()
if __name__ == '__main__':
main()
是的,它做到了。谢谢!我错误地解释了之前必要的close()函数的效果,所以我没有尝试。 – Duoran 2015-02-23 13:30:43