2017-05-25 161 views
0

最初多处理,我有一个类存储一些处理后的值和重新使用那些与它的其它方法。呼叫类方法的Python

问题是,当我试图将类方法分成多个进程加速,python衍生的进程,但似乎没有工作(正如我在任务管理器中看到,只有1个进程正在运行),结果是从来没有交付。

我做了几个搜索,发现pathos.multiprocessing可以做到这一点,而不是,但我不知道是否标准库就可以解决这个问题呢?

from multiprocessing import Pool 

class A(): 
    def __init__(self, vl): 
     self.vl = vl 
    def cal(self, nb): 
     return nb * self.vl 
    def run(self, dt): 
     t = Pool(processes=4) 
     rs = t.map(self.cal, dt) 
     t.close() 
     return t 

a = A(2) 

a.run(list(range(10))) 
+2

使用'如果__name__ == '__main __''后卫。 – user2357112

+0

这似乎并不需要多处理。你在使用更大的数据还是什么?或不同的方法?产生执行一个乘法的过程的开销并不值得。 – acushner

+0

一个类似的问题出现在'joblib'前面;使用numpy会更快,因为新流程的复制和上下文切换工作量很大。 https://stackoverflow.com/questions/44084513/joblib-simple-example-parallel-example-slower-than-simple/44084595?noredirect=1#comment75345269_44084595 –

回答

2

你的代码失败,因为它无法pickle实例方法(self.cal),这是Python的试图通过它们映射到multiprocessing.Pool(当然,是有办法做到这一点,当你产卵多个进程做,但它的方式太令人费解,而不是非常有用反正) - 因为有它有“包”的数据并将其发送到生成的过程,拆包没有共享内存的访问。如果您试图腌制a实例,则会发生同样的情况。

multiprocessing包唯一可用的共享内存访问是一个鲜为人知的multiprocessing.pool.ThreadPool所以如果你真的想这样做:

from multiprocessing.pool import ThreadPool 

class A(): 
    def __init__(self, vl): 
     self.vl = vl 
    def cal(self, nb): 
     return nb * self.vl 
    def run(self, dt): 
     t = ThreadPool(processes=4) 
     rs = t.map(self.cal, dt) 
     t.close() 
     return rs 

a = A(2) 
print(a.run(list(range(10)))) 
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

但是,这不会给你的并行化,因为它本质上映射到你的正常的线程它们可以访问共享内存。你应该通过类/静态方法,而不是(如果你需要他们的称呼)伴随着这些数据,您希望他们与(在你的情况self.vl)工作。如果您需要跨进程共享数据,则必须使用一些共享内存抽象,如multiprocessing.Value,当然应用互斥量。

UPDATE

我说你可以做到这一点(也有或多或少都在做它的模块,检查pathos.multiprocessing为例),但我不认为这是值得的麻烦 - 当你来到你必须欺骗你的系统做你想做的事情,有可能是你使用了错误的系统,或者你应该重新考虑你的设计。但是,对于信息性的缘故,这里是做你想做的事,一个多设置一个办法:

import sys 
from multiprocessing import Pool 

def parallel_call(params): # a helper for calling 'remote' instances 
    cls = getattr(sys.modules[__name__], params[0]) # get our class type 
    instance = cls.__new__(cls) # create a new instance without invoking __init__ 
    instance.__dict__ = params[1] # apply the passed state to the new instance 
    method = getattr(instance, params[2]) # get the requested method 
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]] 
    return method(*args) # expand arguments, call our method and return the result 

class A(object): 

    def __init__(self, vl): 
     self.vl = vl 

    def cal(self, nb): 
     return nb * self.vl 

    def run(self, dt): 
     t = Pool(processes=4) 
     rs = t.map(parallel_call, self.prepare_call("cal", dt)) 
     t.close() 
     return rs 

    def prepare_call(self, name, args): # creates a 'remote call' package for each argument 
     for arg in args: 
      yield [self.__class__.__name__, self.__dict__, name, arg] 

if __name__ == "__main__": # important protection for cross-platform use 
    a = A(2) 
    print(a.run(list(range(10)))) 
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

我认为这是非常自我解释它是如何工作的,但总之它通过你的类的名称,其当前状态(没有信号,寿),被称为一个期望的方法和参数与一个parallel_call函数被称为在Pool每个过程调用它。蟒自动泡菜和unpickles所有这些数据,因此所有parallel_call需要做的是重建原始对象,找到在它的期望的方法和使用所提供的参数(或多个)调用它。

这样,我们只传递数据而不尝试传递活动对象,所以Python不会抱怨(在这种情况下,尝试将实例方法的引用添加到类参数并查看会发生什么情况)以及一切正常。

如果你想要沉迷于'魔法',你可以使它看起来完全像你的代码(创建你自己的Pool处理程序,从函数中取出名称并将名称发送到实际进程等),但是这个应该为您的示例提供足够的功能。但是,在您提出自己的希望之前,请记住,只有在共享“静态”实例(一旦您在多处理环境中调用该实例后,它不会更改其初始状态的实例)时,它才会起作用。如果A.cal方法要更改vl属性的内部状态 - 它只会影响其更改的实例(除非主要实例在调用之间调用Pool时发生更改)。如果你想分享状态,你可以升级parallel_call在通话结束后拿起instance.__dict__,并将其与方法调用结果一起返回,然后在主叫方,你必须更新本地__dict__,返回的数据为改变原来的状态。这还不够 - 你实际上必须创建一个共享字典并处理所有的互斥体工作人员让它可以被所有进程同时访问(你可以使用multiprocessing.Manager)。

所以,我刚才说了,超过其价值的麻烦......

+0

所以在我的情况下,没有解决方案,我们可以使用一个类方法在类中使用共享内存来垃圾多个进程,对吧?是否正确的方法只能在课堂之外带来产卵方法? – Gotte

+0

有一个解决方案,我刚刚更新了我的答案。它很笨重,但并不是那么好,但它的工作原理......然后,我会建议重新考虑你的设计,这样你就不必处理共享实例的状态。没有理由让自己比自己更难... – zwer

+0

谢谢你的帮助。我加了你的代码,它运行完美,但我认为我应该重新设计我的代码,就像你的建议:) – Gotte

0

问题:似乎没有工作(因为我只有1个进程正在运行任务管理器中看到的) 并且结果从未交付。

只看到1过程Pool计算用于处理的数目如下:
你给range(10) =任务索引0..9,因此Pool计算(10/4) * 4 = 8+1 = 9
在开始第一个process之后,不再有任务了。
使用range(32),您将看到 process正在运行。

您将返回return t,而不是返回rs = pool.map(...的结果。


这将工作,例如

def cal(self, nb): 
    import os 
    print('pid:{} cal({})'.format(os.getpid(), nb)) 
    return nb * self.vl 

def run(self,df): 
    with mp.Pool(processes=4) as pool: 
     rs = pool.map(self.cal, df) 
    pool.close() 
    return rs 

if __name__ == '__main__': 
    a = A(2) 
    result = a.run(list(range(32))) 
    print(result) 

与Python测试:3.4.2