0
我必须将数据从测试用例传递到模拟服务器。 这样做的最好方法是什么?将数据异步传递给Python服务器类
这是我迄今为止
mock_server.py
class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
pass
class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
def __init__(self, request, client_address, server):
SocketServer.BaseRequestHandler.__init__(self,request,client_address,server)
def handle(self):
print server.data #this is where i need the data
class server_wrap:
def __init__(self):
self.server = ThreadedUDPServer(("127.0.0.1",49555) , ThreadedUDPRequestHandler)
def set_data(self,data)
self.server.data = data
def start(self)
server_thread = threading.Thread(target=self.server.serve_forever())
def stop(self)
self.server.shutdown()
test_mock的.py
server_inst = server_wrap()
server_inst.start()
#code which sets the data and expects the handle method to print the data set
server_inst.stop()
,我有这个代码的问题是,在执行在server_inst.start()处停止,服务器进入无限侦听模式
,我曾尝试其他解决方案,但没有成功
- 使用全局变量
- 使用队列
- 开始mock_server.py 有自己主要
让我了解其他可能的解决方案。在此先感谢
更新1:
使用单独的线程将数据发送到套接字:
变化
test_mock.py
def test_set_data(data)
server_inst = server_wrap()
server_inst.set_data(data)
server_inst.start()
if __name__ == "__main__":
thread = Thread(target=test_set_data, args=("foo_data))
thread.setDaemon(True)
thread.start()
#test code which verifies if data set is same
#works so far, able to pass data
#problem starts now
thread = Thread(target=test_set_data, args=("bar_data))
thread.setDaemon(True)
thread.start()
#says address already in use error
#Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object
感谢
感谢调用它,就会给它一个尝试,并更新你 –
我对着address_reuse_error由于不能够关闭线程。我已经更新了这个问题来反映这一点。谢谢 –
你什么时候得到地址重用? –