2016-08-12 102 views
2

我想了解,是否有可能运行asyncio.Server实例,而事件循环已经运行了run_forever方法(从一个单独的线程, 当然)。 据我所知,服务器可以启动loop.run_until_complete(asyncio.start_server(...))await asyncio.start_server(...),如果循环已经运行。 第一种方法对我来说是不可接受的,因为循环已经由run_forever方法运行。但是我也不能使用await表达式,因为我要从“循环区域”之外开始它(例如,从主要方法,它不能被标记为异步,对不对?)是否有可能运行asyncio.Server实例,而事件循环已经在运行

def loop_thread(loop): 
    asyncio.set_event_loop(loop) 
    try: 
     loop.run_forever() 
    finally: 
     loop.close() 
     print("loop clesed") 

class SchedulerTestManager: 
    def __init__(self): 
     ... 

     self.loop = asyncio.get_event_loop() 
     self.servers_loop_thread = threading.Thread(
      target=loop_thread, args=(self.loop,)) 
     ... 

    def start_test(self): 
     self.servers_loop_thread.start() 
     return self.servers_loop_thread 

    def add_router(self, router): 
     r = self.endpoint.add_router(router) 
     host = router.ConnectionParameters.Host 
     port = router.ConnectionParameters.Port 
     srv = TcpServer(host, port) 
     server_coro = asyncio.start_server(
      self.handle_connection, self.host, self.port) 
     # does not work since add_router is not async 
     # self.server = await server_coro 
     # does not work, since the loop is already running 
     # self.server = self.loop.run_until_complete(server_coro) 
     return r 


def maind(): 
    st_manager = SchedulerTestManager() 
    thread = st_manager.start_test() 
    router = st_manager.add_router(router) 

最简单的解决方案是在开始测试(运行循环)之前添加所有路由器(服务器)。但我想尝试实现它,所以当测试已经运行时可以添加路由器。我认为loop.call_sooncall_soon_threadsafe)方法可以帮助我,但它似乎不能说明协程,而只是一个简单的函数。

希望我的解释不是很混乱。提前致谢!

回答

1

对于在一个线程中执行的事件循环与在其他线程中执行的常规旧线程代码之间的通信,您可以使用janus库。

这是一个有两个接口的队列:异步和线程安全同步。

这是用法示例:

import asyncio 
import janus 

loop = asyncio.get_event_loop() 
queue = janus.Queue(loop=loop) 

def threaded(sync_q): 
    for i in range(100): 
     sync_q.put(i) 
    sync_q.join() 

@asyncio.coroutine 
def async_coro(async_q): 
    for i in range(100): 
     val = yield from async_q.get() 
     assert val == i 
     async_q.task_done() 

fut = loop.run_in_executor(None, threaded, queue.sync_q) 
loop.run_until_complete(async_coro(queue.async_q)) 
loop.run_until_complete(fut) 

您可以创建一个循环从队列中等待新的消息,并要求启动新服务器的任务。其他线程可能会将新消息推入队列以请求新的服务器。

+0

谢谢!似乎这就是我要找的。 –

相关问题