2017-05-29 113 views
3

我们正在使用aiohttp构建休息api。我们的应用程序设计为使用户比接收响应更频繁地发送请求(因为计算时间)。对于用户来说只是最新请求的结果很重要。是否有可能停止计算过时的请求?当新请求来自同一用户时,如何取消先前的请求

谢谢

+0

,与其违反HTTP的核心原则之一。如果您想将该服务扩展到负载均衡器后面的多个服务器,该怎么办?那么它会得到一些其他服务器可能处理的非常棘手的取消请求。这可能不是你应该试图解决的实际问题! – deceze

+0

“取消”意味着什么?服务器断开连接? – deceze

+0

重写后:这是否意味着一个HTTP请求可能需要几分钟(?)才能完成?这也是不好的HTTP设计;一个请求应该在几秒钟内最多回答,典型的基准目标是200ms。如果您需要更长时间的运行作业,请将其视为作业:一个HTTP请求启动后台任务并返回任务ID。然后,客户端可以使用另一个HTTP请求查询有关作业状态。例如。 'POST/tasks {params}','GET/tasks/42'。显然,这些后台任务可以通过任何在后端有意义的机制来取消。 – deceze

回答

3

你正在构建一个非常像HTTP的东西。一个HTTP请求不应该花费几毫秒的时间来回答,并且HTTP请求不应该是相互依赖的;如果您需要执行需要相当长时间的计算,可以尝试通过更改体系结构/模型/缓存/任何内容来加速它们,或者将其明确视为可以通过HTTP接口控制的长时间运行作业这意味着“工作”是可以通过HTTP查询的“物理资源”。您可以通过一个POST请求创建资源:

POST /tasks 
Content-Type: application/json 

{"some": "parameters", "go": "here"} 
{"resource": "/tasks/42"} 

然后你就可以查询任务的状态:

GET /tasks/42 
{"status": "pending"} 

,最终得到的结果:

GET /tasks/42 
{"status": "done", "results": [...]} 

当你帖子里面新陈代谢一个新的任务,你的后台可以取消以任何方式旧任务,它认为合适的;该资源将返回“取消”或类似的状态。开始新任务后,您的客户将不会再次查询旧资源。即使您的客户端每秒查询一次资源,它仍然会在服务器上使用更少的资源(一个连接在固定的10秒内打开,而10个连接在相同的时间范围内打开200毫秒),特别是如果你对其应用一些智能缓存。由于您可以独立于HTTP前端扩展任务后端,因此它的可扩展性更高,并且HTTP前端可以简单地缩放到多个服务器和负载平衡器。

0

我会从@ Drizzt1991发布的解决方案:

嘿,阿尔乔姆。你在那里有一个奇怪的要求。 了解如果1个客户端使用保持活动的套接字,在回答第一个请求之前,不可能看到下一个请求。这就是HTTP的工作方式,它期望在发送另一个请求之前的结果。 因此,如果客户端将在2个独立的套接字上运行,而您需要断言来自同一个客户端的2个套接字将在同一台机器上路由,那么您的情况才会起作用。通过实践,这对于故障转移和其他东西并不是那么有效。基本上它将是一个有状态的API。 即使你这么做,并非所有的图书馆都支持取消。传统的关系数据库只会忽略结果,但仍会处理待定查询。如果你使用图遍历来做复杂的事情,并且你有很多步骤,那么你可以取消它。

但是,如果你断言,该客户端使用套接字池,它们被路由到同一台机器,并请求从取消中获益,这样的事情应该做的伎俩:

import asyncio 
import random 

from aiohttp import web 


def get_session_id(request): 
    # I don't know how you do session management, so left it out 
    return "" 


async def handle(request): 
    session_id = get_session_id(request) 
    request['tr_id'] = tr_id = int(random.random() * 1000000) 

    running_tasks = request.app['running_tasks'] 
    if session_id in running_tasks and not running_tasks[session_id].done(): 
     running_tasks[session_id].cancel() 
     del running_tasks[session_id] 

    current_task = asyncio.ensure_future(_handle_impl(request)) 
    running_tasks[session_id] = current_task 

    try: 
     resp = await current_task 
    except asyncio.CancelledError: 
     print("Cancelled request", tr_id) 
     resp = web.Response(text="Cancelled {}".format(tr_id)) 
    finally: 
     if running_tasks[session_id] is current_task: 
      del running_tasks[session_id] 
    return resp 


async def _handle_impl(request): 
    tr_id = request['tr_id'] 
    print("Start request", tr_id) 
    await asyncio.sleep(10) 
    print("Finished request", tr_id) 
    return web.Response(text="Finished {}".format(tr_id)) 


app = web.Application() 
app.router.add_get('/', handle) 
app.router.add_get('/{name}', handle) 

app['running_tasks'] = {} 

web.run_app(app, host="127.0.0.1", port=8080)