2015-12-02 93 views
0
@asyncio.coroutine 
    def listener(): 
     while True: 
      message = yield from websocket.recieve_message() 
      if message: 
       yield from handle(message) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(listener()) 

假设我使用带有asyncio的websockets。这意味着我收到来自websockets的消息。当我收到一条消息时,我想处理这条消息,但是我用我的代码丢失了所有的异步事物。因为yield from handle(message)正在定义阻塞...我怎么能找到一种方法使其非阻塞?就像,在同一时间处理多个消息。在我可以处理另一条消息之前,不必等待消息被处理。Asyncio和无限循环

谢谢。

+2

通常你需要每个websocket读取任务,写入websocket可能会与其他任务异步执行。 “手柄”也可能是单独的任务。 你的代码不完整,所以你很难得到你需要的东西。 –

+0

如果这是你调用的唯一协程,那么监听器会阻塞,因为它会无限运行(因为while循环)。如果你有另一个同时运行的协同程序(从语句中产生它自己的收益),那么asyncio将在语句收益之间来回反弹,因此它将不再是“阻塞”的。 – shongololo

回答

2

如果你不关心从手柄的消息的返回值,你可以简单地为它创建一个新的任务,将在事件循环旁边的WebSocket的阅读器上运行。下面是一个简单的例子:

@asyncio.coroutine 
def listener(): 
    while True: 
     message = yield from websocket.recieve_message() 
     if message: 
      asyncio.ensure_future(handle(message)) 

ensure_future将创建一个任务,并将其连接到默认的事件循环。由于循环已经在运行,它将与您的websocket阅读器并行处理。事实上,如果它是一个缓慢运行的I/O阻塞任务(如发送电子邮件),您可以轻松地同时运行几十个句柄(消息)任务。它们是在需要时动态创建的,并在完成时销毁(比产生线程的开销低得多)。

如果您想要更多的控制权,您可以简单地在读取器中写入一个asyncio.Queue,并拥有一个固定大小的任务池,它可以消耗队列,这是多线程或多进程中的典型模式节目。

@asyncio.coroutine 
def consumer(queue): 
    while True: 
     message = yield from queue.get() 
     yield from handle(message) 

@asyncio.coroutine 
def listener(queue): 
    for i in range(5): 
     asyncio.ensure_future(consumer(queue)) 
    while True: 
     message = yield from websocket.recieve_message() 
     if message: 
      yield from q.put(message) 

q = asyncio.Queue() 
loop = asyncio.get_event_loop() 
loop.run_until_complete(listener(q))