2016-11-18 59 views
0

所以我试图找出一个通用的解决方案,它将收集函数中的所有值并将它们附加到稍后可访问的列表中。这将在concurrent.futuresthreading类型任务期间使用。这里是一个解决方案,我已经使用全局master_list如何在不使用全局变量的情况下在多线程中收集函数返回值?

from concurrent.futures import ThreadPoolExecutor 

master_list = [] 
def return_from_multithreaded(func): 
    # master_list = [] 
    def wrapper(*args, **kwargs): 
     # nonlocal master_list 
     global master_list 
     master_list += func(*args, **kwargs) 
    return wrapper 


@return_from_multithreaded 
def f(n): 
    return [n] 


with ThreadPoolExecutor(max_workers=20) as exec: 
    exec.map(f, range(1, 100)) 

print(master_list) 

我想找到一个解决方案,不包括全局,也许可以返回注释掉master_list存储为一个封闭?

回答

2

如果你不想使用全局变量,不要丢弃的map结果。 map让你回到每个函数返回的值,你只是忽略它们。这段代码可以通过使用map根据其预期目的进行简单得多:

def f(n): 
    return n # No need to wrap in list 

with ThreadPoolExecutor(max_workers=20) as exec: 
    master_list = list(exec.map(f, range(1, 100))) 

print(master_list) 

如果你需要一个master_list显示,到目前为止(也许其他线程都在注视着它)计算出的结果,你只要把循环明确:

def f(n): 
    return n # No need to wrap in list 

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    for result in exec.map(f, range(1, 100)): 
     master_list.append(result) 

print(master_list) 

这是Executor模型的设计目的;普通线程并不打算返回值,但Executors提供了一个返回值的通道,所以你不必自己管理它。在内部,这是使用某种形式的队列或其他元数据来保持结果的顺序,但您不需要处理这种复杂性;从你的角度来看,它相当于常规的map函数,它只是平行工作。


更新覆盖处理异常:

map将提高在工人提出的任何异常时,结果被击中。因此,如书面所述,如果任何任务失败,则第一组代码将不会存储任何内容(list将部分构建,但在异常提升时丢弃)。第二个例子只会在抛出第一个异常之前保留结果,其余部分将被丢弃(您必须存储迭代器map并使用一些不太合适的代码来避免)。如果你需要存储所有成功的结果,忽略失败(或者只是记录它们以某种方式),这是最容易使用submit创建Futurelist一个对象,然后等待他们,顺序或按完成的顺序,包裹.result()请拨打try/except以避免丢弃良好结果。例如,存储的结果提交的顺序,你会怎么做:

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    futures = [exec.submit(f, i) for i in range(1, 100)] 
    exec.shutdown(False) # Optional: workers terminate as soon as all futures finish, 
          # rather than waiting for all results to be processed 
    for fut in futures: 
     try: 
      master_list.append(fut.result()) 
     except Exception: 
      ... log error here ... 

为了更有效的代码,你可以完成,没有提交的顺序检索结果,使用concurrent.futures.as_completed热切检索结果他们完成。从上面的代码中唯一的变化是:

for fut in futures: 

变为:

for fut in concurrent.futures.as_completed(futures): 

其中as_completed尽快做的yield工作ING完成/取消期货作为他们完整的,而不是推迟,直到所有期货提交之前完成并得到处理。

有更复杂的选项涉及使用add_done_callback,所以主线程根本不涉及显式处理结果,但这通常是不必要的,并且经常会令人困惑,因此如果可能,最好避免使用。

+0

+1用于分享好的信息。我有一个疑问,如果通过函数引发异常,它的行为如何?它处理了吗? –

+0

@Moinuddin根据我的经验,使用ThreadPoolExcecutors而不是'map'来处理错误,您可以使用submit来返回未来,然后在完成后调用future.result()。这会引发任何被发现的例外。 – flybonzai

+0

@flybonzai:亚尔。创建'Future'的'list',然后,如果结果顺序很重要,只需循环访问'list'并调用'result'(包含在'try' /'except'中以处理worker中出现的异常)。如果结果顺序无关紧要,[使用'concurrent.futures.as_completed'](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed),这会产生'未来的对象,因为他们完成(成功或由于例外);再次,你可以在'try' /'except'块中调用'result'来处理错误。如果命令不重要,后者通常更有效率。 – ShadowRanger

2

我以前遇到过这个问题:Running multiple asynchronous function and get the returned value of each function。这是我的方法去做:

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    def worker(function, f_args, f_kwargs, queue, index): 
     """ 
     Runs the function and appends the output to list, and the Exception in the case of error 
     """ 
     response = { 
      'index': index, # For tracking the index of each function in actual list. 
          # Since, this function is called asynchronously, order in 
          # queue may differ 
      'data': None, 
      'error': None 
     } 

     # Handle error in the function call 
     try: 
      response['data'] = function(*f_args, **f_kwargs) 
     except Exception as e: 
      response['error'] = e # send back the exception along with the queue 

     queue.put(response) 
    queue = Queue() 
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \ 
        for i, (func, args, kwargs) in enumerate(func_list)] 

    for process in processes: 
     process.start() 

    response_list = [] 
    for process in processes: 
     # Wait for process to finish 
     process.join() 

     # Get back the response from the queue 
     response = queue.get() 
     if response['error']: 
      raise response['error'] # Raise exception if the function call failed 
     response_list.append(response) 

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])] 

采样运行:

def my_sum(x, y): 
    return x + y 

def your_mul(x, y): 
    return x*y 

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]] 

async_call(my_func_list) 
# Value returned: [3, 2] 
相关问题