2017-08-01 56 views
0

我正在考虑在龙卷风中查找多个后端服务器的扇出代理以及在返回之前不等所有响应的可能用例。在龙卷风中放弃期货

如果您使用WaitIterator但在收到有用回复后没有继续等待,剩余期货是否存在问题?

也许其他期货的结果不会被清理?也许回调可能会被添加到任何剩余的期货中以放弃其结果?

#!./venv/bin/python 

from tornado import gen 
from tornado import httpclient 
from tornado import ioloop 
from tornado import web 
import json 


class MainHandler(web.RequestHandler): 
    @gen.coroutine 
    def get(self): 
     r1 = httpclient.HTTPRequest(
      url="http://apihost1.localdomain/api/object/thing", 
      connect_timeout=4.0, 
      request_timeout=4.0, 
     ) 
     r2 = httpclient.HTTPRequest(
      url="http://apihost2.localdomain/api/object/thing", 
      connect_timeout=4.0, 
      request_timeout=4.0, 
     ) 
     http = httpclient.AsyncHTTPClient() 
     wait = gen.WaitIterator(
      r1=http.fetch(r1), 
      r2=http.fetch(r2) 
     ) 
     while not wait.done(): 
      try: 
       reply = yield wait.next() 
      except Exception as e: 
       print("Error {} from {}".format(e, wait.current_future)) 
      else: 
       print("Result {} received from {} at {}".format(
        reply, wait.current_future, 
        wait.current_index)) 
       if reply.code == 200: 
        result = json.loads(reply.body) 
        self.write(json.dumps(dict(result, backend=wait.current_index))) 
        return 


def make_app(): 
    return web.Application([ 
     (r'/', MainHandler) 
    ]) 


if __name__ == '__main__': 
    app = make_app() 
    app.listen(8888) 
    ioloop.IOLoop.current().start() 

回答

0

所以我已经检查了源代码WaitIterator

它跟踪期货添加回调,当被解雇时,迭代器将结果排队或者(如果你叫next())履行未来它给你。

作为未来,您只需致电.next()即可创建,您似乎可以退出while not wait.done(),并且不会在没有观察员的情况下离开任何期货。

引用计数应该允许WaitIterator实例保留,直到所有期货已经开始回调然后回收。

更新2017年8月2日
已经与子类WaitIterator有额外的记录进一步测试,是迭代器都将被清理,当所有的期货回报,但如果其中任何期货返回异常它会记录这个异常没有被观察到。

ERROR:tornado.application:Future exception was never retrieved: HTTPError: HTTP 599: Timeout while connecting

在总结和回答我的问题:在完成WaitIterator是没有必要从一个清理点,但它可能是希望从一个记录点这样做。

如果你想确定,将等待迭代器传递给一个新的未来,将完成消耗它并添加一个观察器就足够了。例如

@gen.coroutine 
def complete_wait_iterator(wait): 
    rounds = 0 
    while not wait.done(): 
     rounds += 1 
     try: 
      reply = yield wait.next() 
     except Exception as e: 
      print("Not needed Error {} from {}".format(e, wait.current_future)) 
     else: 
      print("Not needed result {} received from {} at {}".format(
       reply, wait.current_future, 
       wait.current_index)) 
    log.info('completer finished after {n} rounds'.format(n=rounds)) 


class MainHandler(web.RequestHandler): 
    @gen.coroutine 
    def get(self): 
     r1 = httpclient.HTTPRequest(
      url="http://apihost1.localdomain/api/object/thing", 
      connect_timeout=4.0, 
      request_timeout=4.0, 
     ) 
     r2 = httpclient.HTTPRequest(
      url="http://apihost2.localdomain/api/object/thing", 
      connect_timeout=4.0, 
      request_timeout=4.0, 
     ) 
     http = httpclient.AsyncHTTPClient() 
     wait = gen.WaitIterator(
      r1=http.fetch(r1), 
      r2=http.fetch(r2) 
     ) 
     while not wait.done(): 
      try: 
       reply = yield wait.next() 
      except Exception as e: 
       print("Error {} from {}".format(e, wait.current_future)) 
      else: 
       print("Result {} received from {} at {}".format(
        reply, wait.current_future, 
        wait.current_index)) 
       if reply.code == 200: 
        result = json.loads(reply.body) 
        self.write(json.dumps(dict(result, backend=wait.current_index))) 
        consumer = complete_wait_iterator(wait) 
        consumer.add_done_callback(lambda f: f.exception()) 
        return