你的代码失败,因为它无法pickle
实例方法(self.cal
),这是Python的试图通过它们映射到multiprocessing.Pool
(当然,是有办法做到这一点,当你产卵多个进程做,但它的方式太令人费解,而不是非常有用反正) - 因为有它有“包”的数据并将其发送到生成的过程,拆包没有共享内存的访问。如果您试图腌制a
实例,则会发生同样的情况。
在multiprocessing
包唯一可用的共享内存访问是一个鲜为人知的multiprocessing.pool.ThreadPool
所以如果你真的想这样做:
from multiprocessing.pool import ThreadPool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = ThreadPool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return rs
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
但是,这不会给你的并行化,因为它本质上映射到你的正常的线程它们可以访问共享内存。你应该通过类/静态方法,而不是(如果你需要他们的称呼)伴随着这些数据,您希望他们与(在你的情况self.vl
)工作。如果您需要跨进程共享数据,则必须使用一些共享内存抽象,如multiprocessing.Value
,当然应用互斥量。
UPDATE
我说你可以做到这一点(也有或多或少都在做它的模块,检查pathos.multiprocessing
为例),但我不认为这是值得的麻烦 - 当你来到你必须欺骗你的系统做你想做的事情,有可能是你使用了错误的系统,或者你应该重新考虑你的设计。但是,对于信息性的缘故,这里是做你想做的事,一个多设置一个办法:
import sys
from multiprocessing import Pool
def parallel_call(params): # a helper for calling 'remote' instances
cls = getattr(sys.modules[__name__], params[0]) # get our class type
instance = cls.__new__(cls) # create a new instance without invoking __init__
instance.__dict__ = params[1] # apply the passed state to the new instance
method = getattr(instance, params[2]) # get the requested method
args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
return method(*args) # expand arguments, call our method and return the result
class A(object):
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(parallel_call, self.prepare_call("cal", dt))
t.close()
return rs
def prepare_call(self, name, args): # creates a 'remote call' package for each argument
for arg in args:
yield [self.__class__.__name__, self.__dict__, name, arg]
if __name__ == "__main__": # important protection for cross-platform use
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
我认为这是非常自我解释它是如何工作的,但总之它通过你的类的名称,其当前状态(没有信号,寿),被称为一个期望的方法和参数与一个parallel_call
函数被称为在Pool
每个过程调用它。蟒自动泡菜和unpickles所有这些数据,因此所有parallel_call
需要做的是重建原始对象,找到在它的期望的方法和使用所提供的参数(或多个)调用它。
这样,我们只传递数据而不尝试传递活动对象,所以Python不会抱怨(在这种情况下,尝试将实例方法的引用添加到类参数并查看会发生什么情况)以及一切正常。
如果你想要沉迷于'魔法',你可以使它看起来完全像你的代码(创建你自己的Pool
处理程序,从函数中取出名称并将名称发送到实际进程等),但是这个应该为您的示例提供足够的功能。但是,在您提出自己的希望之前,请记住,只有在共享“静态”实例(一旦您在多处理环境中调用该实例后,它不会更改其初始状态的实例)时,它才会起作用。如果A.cal
方法要更改vl
属性的内部状态 - 它只会影响其更改的实例(除非主要实例在调用之间调用Pool
时发生更改)。如果你想分享状态,你可以升级parallel_call
在通话结束后拿起instance.__dict__
,并将其与方法调用结果一起返回,然后在主叫方,你必须更新本地__dict__
,返回的数据为改变原来的状态。这还不够 - 你实际上必须创建一个共享字典并处理所有的互斥体工作人员让它可以被所有进程同时访问(你可以使用multiprocessing.Manager
)。
所以,我刚才说了,超过其价值的麻烦......
使用'如果__name__ == '__main __''后卫。 – user2357112
这似乎并不需要多处理。你在使用更大的数据还是什么?或不同的方法?产生执行一个乘法的过程的开销并不值得。 – acushner
一个类似的问题出现在'joblib'前面;使用numpy会更快,因为新流程的复制和上下文切换工作量很大。 https://stackoverflow.com/questions/44084513/joblib-simple-example-parallel-example-slower-than-simple/44084595?noredirect=1#comment75345269_44084595 –