1

我正在尝试利用Python3中的concurrent.futures.ProcessPoolExecutor来并行处理大矩阵。代码的一般结构是:为什么concurrent.futures.ProcessPoolExecutor的性能很低?

class X(object): 

self.matrix 

def f(self, i, row_i): 
    <cpu-bound process> 

def fetch_multiple(self, ids): 
    with ProcessPoolExecutor() as executor: 
     futures = [executor.submit(self.f, i, self.matrix.getrow(i)) for i in ids] 
     return [f.result() for f in as_completed(futures)] 

self.matrix是一个大scipy csr_matrixf是我的concurrrent函数,它需要一行self.matrix并对其应用CPU上的进程。最后,fetch_multiple是并行运行多个实例f并返回结果的函数。

的问题是,在运行脚本之后,所有的CPU核心均小于50%,忙(见下截图):

enter image description here

为什么所有的内核是不是很忙?

我认为问题是self.matrix的大对象,并在进程之间传递行向量。我怎么解决这个问题?

回答

1

是的。 开销不应该那么大 - 但它很可能是你的CPU出现中断的原因(尽管它们应该忙于传递数据)。

但试试这里的配方,将对象的“指针”传递给使用共享内存的子进程。

http://briansimulator.org/sharing-numpy-arrays-between-processes/

从那里报价:

from multiprocessing import sharedctypes 
size = S.size 
shape = S.shape 
S.shape = size 
S_ctypes = sharedctypes.RawArray('d', S) 
S = numpy.frombuffer(S_ctypes, dtype=numpy.float64, count=size) 
S.shape = shape 

现在我们可以发送S_ctypes和形状的子进程 多,并将其转换回numpy的数组中的子 过程如下:

from numpy import ctypeslib 
S = ctypeslib.as_array(S_ctypes) 
S.shape = shape 

这应该是棘手的照顾引用计数,但我想numpy.ctypeslib照顾 - 所以,只是协调实际行号传递给子进程,他们不在同一个数据上工作