我需要每秒调用一次任务(例如)来轮询某个硬件上的某些传感器数据。在单元测试中,我想要做的就是检查是否调用了正确的方法,并且错误(例如传感器已被炸毁或消失)确实被捕获。在asyncio中测试永久运行的任务
这里是一个玩具例如模仿真正的代码:
import pytest
import asyncio
import mock
async def ook(func):
while True:
await asyncio.sleep(1)
func()
@pytest.mark.asyncio
async def test_ook():
func = mock.Mock()
await ook(func)
assert func.called is True
正如预期的那样,运行此将永远阻塞。
如何取消ook
任务,以便单元测试不会阻止?
解决方法是将循环拆分为另一个函数并将其定义为不可测试。我想避免这样做。请注意,与func
搞混(要打电话loop.close()
或其他一些)不起作用,因为它只是在那里玩具示例测试可以断言的东西。