2014-10-31 84 views
2

我想要创建一个函数,该函数在给定函数和相对参数列表的情况下,尽可能多地启动进程以并行化这些任务。正在运行的进程数量不能超过我的CPU的核心数量。当一个过程完成时,它应该被另一个替换,直到结束。在多个CPU环境中并行处理大量函数

我试图用python Pools来实现这样的事情。这是我的功能:

from multiprocessing import Pool, cpu_count 

CPUS = cpu_count() 

def parallelize(functions, args): 
    results = [] 
    if CPUS > 1: 
     for i in xrange(0, len(functions), CPUS): 
      pool = Pool() 
      for j in xrange(CPUS): 
       if i + j >= len(functions): 
        break 
       results.append(pool.apply_async(functions[i + j], args = args[i + j])) 
      pool.close() 
      pool.join() 
     map(lambda x: x.get(), results) 
    else: 
     for i in xrange(len(functions)): 
      results.append(functions[i](*args[i])) 
    return results 

此实现细分批量函数列表。每个批量维度都等于实际CPU的数量。问题在于,它实际上一直等到每个批量的功能完成,然后再次启动另一批量的过程。
我不想要这种行为,因为如果在批量中有一个非常慢的函数,其他cpus将在开始新进程之前等待它完成。

什么是正确的方法?

+0

@dano'methods'是功能列表。所以'方法[i + j]'是一个函数。它有什么问题? – ProGM 2014-10-31 14:55:32

+0

啊,对不起。我认为这是一个切片,出于某种原因。忽略我:) – dano 2014-10-31 14:56:54

+0

好吧没问题:) – ProGM 2014-10-31 14:57:35

回答

2

看起来你似乎太过复杂了。无论您给它多少工作项,multiprocessing.Pool将始终以您告诉它的进程数完全运行。因此,如果您创建Pool(CPUS),那么Pool将永远不会同时运行超过CPUS个任务,即使您为其提供CPUS * 100任务。因此,如果没有您做任何特殊工作,它可以满足您的要求,即永远不会运行比您拥有CPU更多的任务。因此,您可以遍历整个方法和参数列表,并在其上调用apply_async,而不用担心批量调用。该Pool将同时执行所有的任务,但也决不超过CPUS任务:

def parallelize(methods, args): 
    results = [] 
    if CPUS > 1: 
     pool = Pool(CPUS) 
     for method, arg in zip(methods, args): 
      results.append(pool.apply_async(method, args=arg)) 
     pool.close() 
     pool.join() 
     out = map(lambda x: x.get(), results) 
    else: 
     for i in xrange(len(methods)): 
      results.append(methods[i](*args[i])) 
    return results 
+0

哦,比我想象的简单。谢谢! – ProGM 2014-10-31 15:04:36