2017-07-14 87 views
0

我有需要socketio-server的测试系统(SUT)。该服务器将在某些功能中响应SUT。所以Socketio-server对我的SUT来说是必要的环境。如何从pytest夹具运行aiohttp应用程序

对于socketio-server,我选择了使用python-socketio的aiohttp。功能

比如我socketio服务器:

from aiohttp import web 
import socketio 


sio = socketio.AsyncServer() 
app = web.Application() 
sio.attach(app) 


@sio.on('commands') 
async def message_handler(sid, msg): 
    if msg['data']['command'] == 'terminal_settings_info': 
     response_msg = {'handler': msg['handler'], 
         'data': {'result': 1, 
           'is_success': True, 
           'TerminalID': 5, 
           'request_id': msg['data']['request_id']}} 
     await sio.emit('commands', response_msg) 

    elif msg['data']['command'] == 'get_agent_info': 
     response_msg = {'handler': msg['handler'], 
         'data': {'result': 1, 
           'is_success': True, 
           'terminal_address': 'Zelenograd', 
           'agent_support_phone': '8-888-88-88-88', 
           'mf_retail': True, 
           'request_id': msg['data']['request_id']}} 
     await sio.emit('commands', response_msg) 

    else: 
     raise ValueError('Unknown request.\nMessage: ' + msg.__repr__()) 


web.run_app(app, host='127.0.0.1', port=1234) 

我想设置从pytest灯具测试完成后,该socketio服务器之前运行测试和拆卸。所以,这个服务器必须非阻塞pytest线程。 这是我的目的。 我该怎么做?

我很擅长Python中的pytest和syncrhonous代码执行,但是用于异步编程(asyncio,aiohttp)的newbi和使用线程,子进程或多处理的newbi。

回答