2016-11-08 69 views
1

add_done_callback方法最近被添加到分布式Future对象中,它允许您在将来完成后采取一些操作,而不管它是否成功。如果你想直接调用任何方法resultexceptiontraceback传递的未来对象如何在回调中获得未来的结果?

http://distributed.readthedocs.io/en/latest/api.html?highlight=add_done_callback#distributed.client.Future.add_done_callback

回调函数将挂起。

的例外,回溯可以但是在回调访问如下: fut._exception().result() fut._traceback().result()

尝试,结果同样的模式 - 即fut._result().result()引发了一个异常:

File "C:\Python\lib\site-packages\tornado\concurrent.py", line 316, in _check_done 
    raise Exception("DummyFuture does not support blocking for results") 
Exception: DummyFuture does not support blocking for results 

如果不能在回调中访问未来的结果,能够添加回调对我来说是有限的使用。

我错过了什么 - 有没有办法在回调中获得未来的结果?

在ASYNCIO文档似乎举个例子,其中直接访问result方法是可行的:

​​

...我不知道如何与龙卷风/分布式的,但它将是非常有用能够做到这一点。

from distributed import Client 


client = Client("127.0.0.1:8786") 

def f(delay): 
    from time import sleep 
    from numpy.random import randn 
    sleep(delay) 
    if randn() > 1: 
     1/0 
    return delay 

def callback(fut): 
    import logging 
    logger = logging.getLogger('distributed') 
    if fut.status == 'finished': 
     res = future._result().result() # <-------------- Doesn't work! 
     logger.info("{!r} - {!s}".format(fut, res)) 
    else: 
     logger.info("{!r} - {!s}".format(fut, fut.status)) 


args = rand(10) 
futs = client.map(f, args) 
for fut in futs: 
    fut.add_done_callback(callback) 

回答

1

当前您的回调在Tornado事件循环中被调用。如果你想获得未来的结果,你将不得不使用Tornado API。

下面是一个小例子:

In [1]: from distributed import Client 
In [2]: client = Client() 
In [3]: def inc(x): 
    ...:  return x + 1 
    ...: 
In [4]: from tornado import gen 

In [5]: @gen.coroutine 
    ...: def callback(future): 
    ...:  result = yield future._result() 
    ...:  print(result * 10) 
    ...:  
In [6]: future = client.submit(inc, 1) 

In [7]: future.add_done_callback(callback) 

20 

然而,你的问题突出了也许这并不是用户与add_done_callback交互最直观的方式,因此,如果我们引入了一个破我也不会感到惊讶更改为更高版本。

In [8]: import distributed 

In [8]: distributed.__version__ 
Out[8]: '1.14.0'