2013-12-13 37 views
11

我正在尝试将multiprocessing添加到某些代码中,这些代码的功能无法修改。我想以异步方式将这些函数作为作业提交给多处理池。我正在做一些很像代码here的代码。但是,我不确定如何跟踪结果。如何知道返回的结果对应哪个应用函数?如何跟踪从多处理池返回的异步结果

需要强调的重要一点是,我无法修改现有功能(其他情况依赖于它们保持原样),并且可以按照与函数作业的应用顺序不同的顺序返回结果游泳池。

感谢您的任何想法!

编辑:一些尝试代码如下:

import multiprocessing 
from multiprocessing import Pool 
import os 
import signal 
import time 
import inspect 

def multiply(multiplicand1=0, multiplicand2=0): 
    return multiplicand1*multiplicand2 

def workFunctionTest(**kwargs): 
    time.sleep(3) 
    return kwargs 

def printHR(object): 
    """ 
    This function prints a specified object in a human readable way. 
    """ 
    # dictionary 
    if isinstance(object, dict): 
     for key, value in sorted(object.items()): 
      print u'{a1}: {a2}'.format(a1=key, a2=value) 
    # list or tuple 
    elif isinstance(object, list) or isinstance(object, tuple): 
     for element in object: 
      print element 
    # other 
    else: 
     print object 

class Job(object): 
    def __init__(
     self, 
     workFunction=workFunctionTest, 
     workFunctionKeywordArguments={'testString': "hello world"}, 
     workFunctionTimeout=1, 
     naturalLanguageString=None, 
     classInstance=None, 
     resultGetter=None, 
     result=None 
     ): 
     self.workFunction=workFunction 
     self.workFunctionKeywordArguments=workFunctionKeywordArguments 
     self.workFunctionTimeout=workFunctionTimeout 
     self.naturalLanguageString=naturalLanguageString 
     self.classInstance=self.__class__.__name__ 
     self.resultGetter=resultGetter 
     self.result=result 
    def description(self): 
     descriptionString="" 
     for key, value in sorted(vars(self).items()): 
      descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) 
     return descriptionString 
    def printout(self): 
     """ 
     This method prints a dictionary of all data attributes. 
     """ 
     printHR(vars(self)) 

class JobGroup(object): 
    """ 
    This class acts as a container for jobs. The data attribute jobs is a list of job objects. 
    """ 
    def __init__(
     self, 
     jobs=None, 
     naturalLanguageString="null", 
     classInstance=None, 
     result=None 
     ): 
     self.jobs=jobs 
     self.naturalLanguageString=naturalLanguageString 
     self.classInstance=self.__class__.__name__ 
     self.result=result 
    def description(self): 
     descriptionString="" 
     for key, value in sorted(vars(self).items()): 
      descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) 
     return descriptionString 
    def printout(self): 
     """ 
     This method prints a dictionary of all data attributes. 
     """ 
     printHR(vars(self)) 

def initialise_processes(): 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 

def execute(
     jobObject=None, 
     numberOfProcesses=multiprocessing.cpu_count() 
     ): 
     # Determine the current function name. 
    functionName=str(inspect.stack()[0][3]) 
    def collateResults(result): 
     """ 
     This is a process pool callback function which collates a list of results returned. 
     """ 
     # Determine the caller function name. 
     functionName=str(inspect.stack()[1][3]) 
     print("{a1}: result: {a2}".format(a1=functionName, a2=result)) 
     results.append(result) 
    def getResults(job): 
     # Determine the current function name. 
     functionName=str(inspect.stack()[0][3]) 
     while True: 
      try: 
       result=job.resultGetter.get(job.workFunctionTimeout) 
       break 
      except multiprocessing.TimeoutError: 
       print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description())) 
     #job.result=result 
     return result 
    # Create a process pool. 
    pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes) 
    print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses))) 
    # Unpack the input job object and submit it to the process pool. 
    print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject)) 
    if isinstance(jobObject, Job): 
     # If the input job object is a job, apply it to the pool with its associated timeout specification. 
     # Return a list of results. 
     job=jobObject 
     print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) 
     # Apply the job to the pool, saving the object pool.ApplyResult to the job object. 
     job.resultGetter=pool1.apply_async(
       func=job.workFunction, 
       kwds=job.workFunctionKeywordArguments 
     ) 
     # Get results. 
     # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. 
     print("{a1}: getting results for job...".format(a1=functionName)) 
     job.result=getResults(job) 
     print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) 
     print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) 
     # Return the job result from execute. 
     return job.result 
     pool1.terminate() 
     pool1.join() 
    elif isinstance(jobObject, JobGroup): 
     # If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification. 
     for job in jobObject.jobs: 
      print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) 
      # Apply the job to the pool, saving the object pool.ApplyResult to the job object. 
      job.resultGetter=pool1.apply_async(
        func=job.workFunction, 
        kwds=job.workFunctionKeywordArguments 
      ) 
     # Get results. 
     # Cycle through each job and and append the result for the job to a list of results. 
     results=[] 
     for job in jobObject.jobs: 
      # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. 
      print("{a1}: getting results for job...".format(a1=functionName)) 
      job.result=getResults(job) 
      print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) 
      #print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) 
      # Collate the results. 
      results.append(job.result) 
     # Apply the list of results to the job group data attribute results. 
     jobObject.results=results 
     print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results)) 
     # Return the job result list from execute. 
     return jobObject.results 
     pool1.terminate() 
     pool1.join() 
    else: 
     # invalid input object 
     print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject)) 

def main(): 
    print('-'*80) 
    print("MULTIPROCESSING SYSTEM DEMONSTRATION\n") 

    # Create a job. 
    print("# creating a job...\n") 
    job1=Job(
      workFunction=workFunctionTest, 
      workFunctionKeywordArguments={'testString': "hello world"}, 
      workFunctionTimeout=4 
    ) 
    print("- printout of new job object:") 
    job1.printout() 
    print("\n- printout of new job object in logging format:") 
    print job1.description() 

    # Create another job. 
    print("\n# creating another job...\n") 
    job2=Job(
      workFunction=multiply, 
      workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3}, 
      workFunctionTimeout=6 
    ) 
    print("- printout of new job object:") 
    job2.printout() 
    print("\n- printout of new job object in logging format:") 
    print job2.description() 

    # Create a JobGroup object. 
    print("\n# creating a job group (of jobs 1 and 2)...\n") 
    jobGroup1=JobGroup(
      jobs=[job1, job2], 
    ) 
    print("- printout of new job group object:") 
    jobGroup1.printout() 
    print("\n- printout of new job group object in logging format:") 
    print jobGroup1.description() 

    # Submit the job group. 
    print("\nready to submit job group") 
    response=raw_input("\nPress Enter to continue...\n") 
    execute(jobGroup1) 

    response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n") 

    # Demonstrate timeout. 
    print("\n # creating a new job in order to demonstrate timeout functionality...\n") 
    job3=Job(
      workFunction=workFunctionTest, 
      workFunctionKeywordArguments={'testString': "hello world"}, 
      workFunctionTimeout=1 
    ) 
    print("- printout of new job object:") 
    job3.printout() 
    print("\n- printout of new job object in logging format:") 
    print job3.description() 
    print("\nNote the timeout specification of only 1 second.") 

    # Submit the job. 
    print("\nready to submit job") 
    response=raw_input("\nPress Enter to continue...\n") 
    execute(job3) 

    response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.") 
    print('-'*80) 

if __name__ == '__main__': 
    main() 

编辑:这个问题已经被置于[暂停]以下陈述的原因:

“的问题询问码必须表明对所解决问题的最低限度理解,包括尝试解决方案,为什么他们不工作,以及预期结果。另请参阅:Stack Overflow question checklist

这个问题不是请求代码;它正在请求思想,一般指导。展示了对所考虑问题的最小理解(注意术语“多处理”,“池”和“异步”的正确使用以及注意事项the reference to prior code)。关于尝试的解决方案,我承认尝试解决方案的努力将会是有益的。我现在添加了这样的代码。我希望我已经解决了导致[搁置]地位的问题。

回答

14

没有看到实际的代码,我只能回答一般性问题。但是有两个通用的解决方案。

首先,不是使用callback而是忽略AsyncResult,将它们存储在某种集合中。那么你可以使用该集合。例如,如果您希望能够查找结果为使用该功能键的功能,只需创建一个功能键一dict

def in_parallel(funcs): 
    results = {} 
    pool = mp.Pool() 
    for func in funcs: 
     results[func] = pool.apply_async(func) 
    pool.close() 
    pool.join() 
    return {func: result.get() for func, result in results.items()} 

或者,您可以更改回调函数存储在按键收集结果。例如:

def in_parallel(funcs): 
    results = {} 
    pool = mp.Pool() 
    for func in funcs: 
     def callback(result, func=func): 
      results[func] = result 
     pool.apply_async(func, callback=callback) 
    pool.close() 
    pool.join() 
    return results 

我使用的功能本身作为重点。但是你想要使用索引,这很简单。你有任何价值,你都可以用作钥匙。


同时,你链接的例子其实只是呼吁一堆参数相同的功能,等待它们全部完成,并以任意顺序离开导致一些迭代。这正是imap_unordered所做的,但更简单。你可以用这个代替从链接的代码整个复杂的事情:

pool = mp.Pool() 
results = list(pool.imap_unordered(foo_pool, range(10))) 
pool.close() 
pool.join() 

然后,如果你想在原来的顺序,而不是以任意顺序的结果,你可以切换到imapmap代替。所以:

pool = mp.Pool() 
results = pool.map(foo_pool, range(10)) 
pool.close() 
pool.join() 

如果您需要类似的,但过于复杂,融入map范式的东西,concurrent.futures将可能使你的生活比multiprocessing容易。如果您使用Python 2.x,则必须安装the backport。但你可以做的事情,是更难做AsyncResult S或callback S(或map),例如撰写了一大堆期货成一个大的未来。请参阅链接文档中的示例。


最后一个音符:

强调的是,我不能修改现有的功能,最重要的点...

如果您不能修改功能,您可以随时包裹它。例如,假设我有一个函数可以返回数字的平方,但是我试图以异步方式为它们的正方形构建一个dict映射数字,所以我需要将原始数字作为结果的一部分。这很简单:

def number_and_square(x): 
    return x, square(x) 

而现在,我可以只是apply_async(number_and_square)而不仅仅是square,得到我想要的结果。

我没有这样做,在上面的例子中,因为在第一种情况下我存储在钥匙插入从呼叫方收集,并在第二个地方,我必将成回调函数。但结合它绕成函数的包装是一样任一一样简单,而当这些都不是可以适当。

+0

非常感谢您对您的明确建议和指导。如你所说,我用函数包装和您的使用功能作为结果的字典中的关键的伎俩试验。 ['''concurrent.futures'''(http://docs.python.org/3/library/concurrent.futures.html)看起来很有希望,我会尽快展开调查,太。再次感谢。 – d3pd

+2

感谢您的回答。我有一个非常类似的问题没有解决。如果您需要能够在每个任务上设置超时并想知道哪些输入导致超时,该怎么办? – chrishiestand