2012-05-02 64 views
61

在下面的示例代码中,我想恢复函数worker的返回值。我怎么能这样做呢?这个值在哪里存储?如何恢复传递给multiprocessing.Process的函数的返回值?

示例代码:

import multiprocessing 

def worker(procnum): 
    '''worker function''' 
    print str(procnum) + ' represent!' 
    return procnum 


if __name__ == '__main__': 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,)) 
     jobs.append(p) 
     p.start() 

    for proc in jobs: 
     proc.join() 
    print jobs 

输出:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>] 

我似乎无法找到存储在jobs对象相关的属性。

由于提前, BLZ

回答

64

使用shared variable沟通。例如像这样:

import multiprocessing 

def worker(procnum, return_dict): 
    '''worker function''' 
    print str(procnum) + ' represent!' 
    return_dict[procnum] = procnum 


if __name__ == '__main__': 
    manager = multiprocessing.Manager() 
    return_dict = manager.dict() 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,return_dict)) 
     jobs.append(p) 
     p.start() 

    for proc in jobs: 
     proc.join() 
    print return_dict.values() 
+16

我建议使用一个['multiprocessing.Queue'](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue),而不是一个'管理器'在这里。使用'Manager'需要产生一个全新的进程,当'Queue'执行时这是过度的。 – dano

+1

@dano:我想知道,如果我们使用Queue()对象,我们不能确定每个进程返回值的顺序。我的意思是如果我们需要结果的顺序,做下一步工作。我们如何确定哪个输出来自哪个进程 – Catbuilts

+4

@Cbubuilts您可以从每个进程返回一个元组,其中一个值是您关心的实际返回值,另一个是来自进程的唯一标识符。但我也想知道为什么你需要知道哪个进程正在返回哪个值。如果你真的需要了解这个过程,或者你需要关联你的输入列表和输出列表?在这种情况下,我建议使用'multiprocessing.Pool.map'来处理你的工作项目清单。 – dano

36

我认为@sega_sai提出的方法是更好的方法。但它确实需要一个代码示例,所以这里有云:

import multiprocessing 
from os import getpid 

def worker(procnum): 
    print 'I am number %d in process %d' % (procnum, getpid()) 
    return getpid() 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes = 3) 
    print pool.map(worker, range(5)) 

,它将打印返回值:

I am number 0 in process 19139 
I am number 1 in process 19138 
I am number 2 in process 19140 
I am number 3 in process 19139 
I am number 4 in process 19140 
[19139, 19138, 19140, 19139, 19140] 

如果您熟悉map(Python的内置),这不应该是太具挑战性。否则看看sega_Sai's link

请注意,只需要很少的代码。 (还请注意过程如何重用)。

+0

任何想法为什么我的'getpid()'返回所有相同的值?我正在运行Python3 – zelusp

+0

我不确定池如何将工作分配给工作人员。如果他们真的很快,也许他们最终都会在同一个员工身上?它是否一致发生?另外,如果你添加一个延迟? – Mark

+0

我也认为这是一个速度相关的东西,但是当我使用超过10个进程提供的'pool.map'范围为1,000,000时,我最多看到两个不同的pid。 – zelusp

5

您可以使用内置的exit来设置进程的退出代码。它可以从exitcode属性的过程中获得:

import multiprocessing 

def worker(procnum): 
    print str(procnum) + ' represent!' 
    exit(procnum) 

if __name__ == '__main__': 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,)) 
     jobs.append(p) 
     p.start() 

    result = [] 
    for proc in jobs: 
     proc.join() 
     result.append(proc.exitcode) 
    print result 

输出:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
[0, 1, 2, 3, 4] 
+2

被警告,这种方法可能会变得混乱。一般情况下,进程应该退出,退出代码为0,完成后没有错误。如果您有任何监控您的系统过程退出代码,那么您可能会将这些报告视为错误。 – ferrouswheel

3

为别人谁是寻求如何使用Queue获得从Process值:

import multiprocessing 

ret = {'foo': False} 

def worker(queue): 
    ret = queue.get() 
    ret['foo'] = True 
    queue.put(ret) 

if __name__ == '__main__': 
    queue = multiprocessing.Queue() 
    queue.put(ret) 
    p = multiprocessing.Process(target=worker, args=(queue,)) 
    p.start() 
    print queue.get() # Prints {"foo": True} 
    p.join() 
+0

当我在我的工作进程中放入某个队列时,我的连接永远不会到达。任何想法可能会如何? –

+0

@LaurensKoppenol你的意思是说你的主代码永久挂在p.join()上,永远不会继续?你的过程是否有无限循环? –

+3

是的,它无限地挂在那里。我的工作人员全部完成(在工作人员功能结束后循环,之后打印声明打印,供所有工作人员使用)。连接不做任何事情。如果我从我的函数中删除'Queue',它确实允许我通过'join()' –

7

这个例子说明如何使用multiprocessing.Pipe实例的列表s至从进程的任意数量的返回字符串:

​​

输出:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!'] 

此解决方案使用更少的资源比使用

或者使用

它是非常有启发性看源每种类型。

+0

如果不将管道设为全局变量,最好的方法是什么? – Nickpick

+0

我把所有的全局数据和代码放到一个主函数中,它的工作原理是一样的。这是否回答你的问题? –

+0

在添加(发送)任何新值之前,总是必须读取管道? – Nickpick

0

我修改vartec的回答有点,因为我需要得到从函数的错误代码。 (感谢vertec !!!它的一个可怕的把戏)

这也可以用manager.list来完成,但我认为最好是在一个字典中存储它并存储一个列表。这样,我们保留函数和结果的方式,因为我们无法确定列表将被填充的顺序。

from multiprocessing import Process 
import time 
import datetime 
import multiprocessing 


def func1(fn, m_list): 
    print 'func1: starting' 
    time.sleep(1) 
    m_list[fn] = "this is the first function" 
    print 'func1: finishing' 
    # return "func1" # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list): 
    print 'func2: starting' 
    time.sleep(3) 
    m_list[fn] = "this is function 2" 
    print 'func2: finishing' 
    # return "func2" 

def func3(fn, m_list): 
    print 'func3: starting' 
    time.sleep(9) 
    # if fail wont join the rest because it never populate the dict 
    # or do a try/except to get something in return. 
    raise ValueError("failed here") 
    # if we want to get the error in the manager dict we can catch the error 
    try: 
     raise ValueError("failed here") 
     m_list[fn] = "this is third" 
    except: 
     m_list[fn] = "this is third and it fail horrible" 
     # print 'func3: finishing' 
     # return "func3" 


def runInParallel(*fns): # * is to accept any input in list 
    start_time = datetime.datetime.now() 
    proc = [] 
    manager = multiprocessing.Manager() 
    m_list = manager.dict() 
    for fn in fns: 
     # print fn 
     # print dir(fn) 
     p = Process(target=fn, name=fn.func_name, args=(fn, m_list)) 
     p.start() 
     proc.append(p) 
    for p in proc: 
     p.join() # 5 is the time out 

    print datetime.datetime.now() - start_time 
    return m_list, proc 

if __name__ == '__main__': 
    manager, proc = runInParallel(func1, func2, func3) 
    # print dir(proc[0]) 
    # print proc[0]._name 
    # print proc[0].name 
    # print proc[0].exitcode 

    # here you can check what did fail 
    for i in proc: 
     print i.name, i.exitcode # name was set up in the Process line 53 

    # here will only show the function that worked and where able to populate the 
    # manager dict 
    for i, j in manager.items(): 
     print dir(i) # things you can do to the function 
     print i, j 
1

出于某种原因,我无法找到如何与Queue任何地方做一个普通的例子(甚至Python的文档例子不产卵多个进程),所以这里是我得到的后像10次尝试工作:

def add_helper(queue, arg1, arg2): # the func called in child processes 
    ret = arg1 + arg2 
    queue.put(ret) 

def multi_add(): # spawns child processes 
    q = Queue() 
    processes = [] 
    rets = [] 
    for _ in range(0, 100): 
     p = Process(target=add_helper, args=(q, 1, 2)) 
     processes.append(p) 
     p.start() 
    for p in processes: 
     ret = q.get() # will block 
     rets.append(ret) 
    for p in processes: 
     p.join() 
    return rets 

Queue是阻塞,线程安全的队列中,你可以用它来返回值从子进程存储。所以你必须将队列传递给每个进程。一些不太明显的是,你们从队列中有get()joinProcess ES否则队列已满,并且块之前的一切。

更新为那些谁是面向对象(在Python 3进行测试。4):

from multiprocessing import Process, Queue 

class Multiprocessor(): 

    def __init__(self): 
     self.processes = [] 
     self.queue = Queue() 

    @staticmethod 
    def _wrapper(func, queue, args, kwargs): 
     ret = func(*args, **kwargs) 
     queue.put(ret) 

    def run(self, func, *args, **kwargs): 
     args2 = [func, self.queue, args, kwargs] 
     p = Process(target=self._wrapper, args=args2) 
     self.processes.append(p) 
     p.start() 

    def wait(self): 
     rets = [] 
     for p in self.processes: 
      ret = self.queue.get() 
      rets.append(ret) 
     for p in self.processes: 
      p.join() 
     return rets 

# tester 
if __name__ == "__main__": 
    mp = Multiprocessor() 
    num_proc = 64 
    for _ in range(num_proc): # queue up multiple tasks running `sum` 
     mp.run(sum, [1, 2, 3, 4, 5]) 
    ret = mp.wait() # get all results 
    print(ret) 
    assert len(ret) == num_proc and all(r == 15 for r in ret) 
相关问题