2016-09-28 165 views
2

我正在使用pathos.multiprocessing来并行化需要使用实例方法的程序。下面是一个最小的工作例如:如何将关键字列表传递给pathos.multiprocessing?

import time 
import numpy as np 
from pathos.multiprocessing import Pool, ProcessingPool, ThreadingPool 

class dummy(object): 
    def __init__(self, arg, key1=None, key2=-11): 

     np.random.seed(arg) 

     randnum = np.random.randint(0, 5) 

     print 'Sleeping {} seconds'.format(randnum) 
     time.sleep(randnum) 

     self.value = arg 
     self.more1 = key1 
     self.more2 = key2 

args = [0, 10, 20, 33, 82] 
keys = ['key1', 'key2'] 
k1val = ['car', 'borg', 'syria', 'aurora', 'libera'] 
k2val = ['a', 'b', 'c', 'd', 'e'] 
allks = [dict(zip(keys, [k1val[i], k2val[i]])) for i in range(5)] 

pool = ThreadingPool(4) 
result = pool.map(dummy, args, k1val, k2val) 

print [[r.value, r.more1, r.more2] for r in result] 

打印结果(如预期):

Sleeping 4 seconds 
Sleeping 1 seconds 
Sleeping 3 seconds 
Sleeping 4 seconds 
Sleeping 3 seconds 
[[0, 'car', 'a'], [10, 'borg', 'b'], [20, 'syria', 'c'], [33, 'aurora', 'd'], [82, 'libera', 'e']] 

然而,在这个调用map最后两个参数事项的顺序,如果我这样做:

result2 = pool.map(dummy, args, k2val, k1val) 

我获得:

[[0, 'a', 'car'], [10, 'b', 'borg'], [20, 'c', 'syria'], [33, 'd', 'aurora'], [82, 'e', 'libera']] 

而我想获得相同的第一个结果。该行为是一样什么apply_asynckwds可以在标准模块multiprocessing在做,即通过字典,其中每个字典键是关键字名称和项目的关键字参数(见allks)的列表。请注意,标准模块multiprocessing不能使用实例方法,因此甚至不符合最低要求。

姑且这将是: 结果= pool.map(假人,ARGS,kwds = allks)#这不工作

回答

3

我是pathos作者。是的,你碰到我所知道的东西需要一点工作。目前,来自ProcessPoolThreadPool,并ParallelPoolmappipe(即apply)方法,不能采取kwds - 你必须通过他们为args。但是,如果你使用_ProcessPool_ThreadPool,那么你可以传递kwds他们mapapply方法。 在pathos.pools以下划线开始的池实际上都直接从multiprocess,所以它们具有相同的API来那些multiprocessing(但具有更好的系列化,因此可以通过类方法等)。

>>> from pathos.pools import _ProcessPool 
>>> from multiprocess.pool import Pool 
>>> Pool is _ProcessPool 
True 

因此,对于编辑的原始代码看起来像这样(从OP的建议编辑):

>>> from pathos.pools import _ThreadPool 
>>> pool = _ThreadPool(4) 
>>> 
[…] 
>>> result = [] 
>>> def callback(x): 
>>> result.append(x) 
>>> 
>>> for a, k in zip(args, allks): 
>>>  pool.apply_async(dummy, args=(a,), kwds=k, callback=callback) 
>>> 
>>> pool.close() 
>>> pool.join() 
+0

一种发自内心的特大型感谢您投入的时间和精力在开发' pathos'。解决我的具体问题的附加荣誉。 – astabada