2017-05-29 81 views
0

我想利用事件循环监视任何数据插入我的asyncio.Queue手动切换(你可以在这里找到https://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py它的源代码),但我碰到一些问题。这是下面的代码:有没有办法对ASYNCIO事件循环

import asyncio 
import threading 

async def recv(q): 
    while True: 
     msg = await q.get() 
     print(msg) 

async def checking_task(): 
    while True: 
     await asyncio.sleep(0.1) 

def loop_in_thread(loop,q): 
    asyncio.set_event_loop(loop) 
    asyncio.ensure_future(recv(q)) 
    asyncio.ensure_future(insert(q)) 
    # asyncio.ensure_future(checking_task()) comment this out, and it will work as intended 
    loop.run_forever() 

async def insert(q): 
    print('invoked') 
    await q.put('hello') 

q = asyncio.Queue() 
loop = asyncio.get_event_loop() 
t = threading.Thread(target=loop_in_thread, args=(loop, q,)) 
t.start() 

程序已经开始,我们可以看到以下结果

invoked 
hello 
-> print(asyncio.Task.all_tasks()) 
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39> 
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>} 

但现在,如果我们手动添加数据到q使用q.put_nowait('test'),我们会得到以下结果:

q.put_nowait('test') # a non-async way to add data into queue 
-> print(asyncio.Task.all_tasks()) 
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39> 
wait_for=<Future finished result=None>>} 

正如你可以看到,未来已经结束,但我们还没有打印出新添加的字符串'test'。换句话说,msg = await q.get()仍在等待,即使将来q.get相关的()完成,并没有运行其他任务。这让我困惑,因为官方文档(https://docs.python.org/3/library/asyncio-task.html)中,它说

结果=等待将来或结果=从未来的收益率 - 暂停协程,直到将来完成,然后返回未来的结果

看来,即使未来完成后,我们还需要在其他异步功能某种await使事件循环继续处理任务。

我发现了一个解决此问题的,这是添加checking_task(),并且还添加协程到事件循环;那么它将按预期工作。

但加入checking_task()协同程序对CPU非常昂贵的,因为它只是运行一个while循环。我想知道是否有一些手动方式让我们触发这个await事件,而不使用异步功能。例如,像神奇的东西

q.put_nowait('test') 
loop.ok_you_can_start_running_other_pending_tasks() 

帮助将不胜感激!谢谢。

回答

0

所以我结束了使用

loop.call_soon_threadsafe(q.put_nowait, 'test') 

,并如预期它会奏效。在弄清楚之后,我搜索了一些有关的信息。原来这个帖子(Scheduling an asyncio coroutine from another thread)有同样的问题。而@ KFX的答案也将工作,这是

loop.call_soon_threadsafe(loop.create_task, q.put('test')) 

通知asyncio.Queue.put()是一个协程,但asyncio.Queue.put_nowait()是一个正常功能。

相关问题