2014-11-03 61 views
1

我试图实现一个使用高速公路| python 和asyncio的websocket/wamp客户端,虽然它有点工作,但也有一些零件不知道 。如何使用autobahn asyncio实现交互式websocket客户端?

我真正想要做的是在qt5/QML中实现WAMP,但这个 看起来像是一个更容易的路径。

这个简化的客户端大部分都是从网上复制过来的。它在onJoin发生时读取 时间服务。

我想要做的就是触发这个从外部来源读取。

我采取的复杂方法是​​在 线程中运行asyncio事件循环,然后通过套接字发送命令来触发读取。 I 到目前为止还无法确定在哪里放置例程/协程,以便可以从读者例行程序中找到它。

我怀疑有一个更简单的方法去做这件事,但我还没有发现它 呢。欢迎提出建议。

#!/usr/bin/python3 
try: 
    import asyncio 
except ImportError: 
    ## Trollius >= 0.3 was renamed 
    import trollius as asyncio 

from autobahn.asyncio import wamp, websocket 
import threading 
import time 
from socket import socketpair 

rsock, wsock = socketpair() 

def reader() : 
    data = rsock.recv(100) 
    print("Received:", data.decode()) 

class MyFrontendComponent(wamp.ApplicationSession): 
    def onConnect(self): 
     self.join(u"realm1") 



    @asyncio.coroutine 
    def onJoin(self, details): 
     print('joined') 
     ## call a remote procedure 
     ## 
     try: 
      now = yield from self.call(u'com.timeservice.now') 
     except Exception as e: 
      print("Error: {}".format(e)) 
     else: 
      print("Current time from time service: {}".format(now)) 



    def onLeave(self, details): 
     self.disconnect() 

    def onDisconnect(self): 
     asyncio.get_event_loop().stop() 



def start_aloop() : 
    loop = asyncio.new_event_loop() 
    asyncio.set_event_loop(loop) 
    transport_factory = websocket.WampWebSocketClientFactory(session_factory, 
        debug = False, 
        debug_wamp = False) 
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080) 
    loop.add_reader(rsock,reader) 
    loop.run_until_complete(coro) 
    loop.run_forever() 
    loop.close() 

if __name__ == '__main__': 
    session_factory = wamp.ApplicationSessionFactory() 
    session_factory.session = MyFrontendComponent 

    ## 4) now enter the asyncio event loop 
    print('starting thread') 
    thread = threading.Thread(target=start_aloop) 
    thread.start() 
    time.sleep(5) 
    print("IN MAIN") 
    # emulate an outside call 
    wsock.send(b'a byte string') 
+0

因此,您希望能够通过某种外部方式触发客户端对timeservice进行RPC调用? – dano 2014-11-03 02:26:17

回答

0

您可以监听事件循环内的插座异步,使用loop.sock_accept。你可以只调用一个协同程序设置的onConnectonJoin插座内:

try: 
    import asyncio 
except ImportError: 
    ## Trollius >= 0.3 was renamed 
    import trollius as asyncio 

from autobahn.asyncio import wamp, websocket 
import socket 

class MyFrontendComponent(wamp.ApplicationSession): 
    def onConnect(self): 
     self.join(u"realm1") 

    @asyncio.coroutine 
    def setup_socket(self): 
     # Create a non-blocking socket 
     self.sock = socket.socket() 
     self.sock.setblocking(0) 
     self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
     self.sock.bind(('localhost', 8889)) 
     self.sock.listen(5) 
     loop = asyncio.get_event_loop() 
     # Wait for connections to come in. When one arrives, 
     # call the time service and disconnect immediately. 
     while True: 
      conn, address = yield from loop.sock_accept(self.sock) 
      yield from self.call_timeservice() 
      conn.close() 

    @asyncio.coroutine 
    def onJoin(self, details): 
     print('joined') 
     # Setup our socket server 
     asyncio.async(self.setup_socket()) 

     ## call a remote procedure 
     ## 
     yield from self.call_timeservice() 

    @asyncio.coroutine 
    def call_timeservice(self): 
     try: 
      now = yield from self.call(u'com.timeservice.now') 
     except Exception as e: 
      print("Error: {}".format(e)) 
     else: 
      print("Current time from time service: {}".format(now)) 

    ... # The rest is the same 
0

感谢您的答复端午。不完全是我需要的解决方案,但它指出了我正确的方向。是的,我希望客户端可以通过外部触发器远程调用RPC。

我来到了,让我通过为特定调用的字符串(虽然只有一个实现现在)

这就是我想出了以下,虽然我不知道如何优雅它是。

import asyncio 
from autobahn.asyncio import wamp, websocket 
import threading 
import time 
import socket 


rsock, wsock = socket.socketpair() 

class MyFrontendComponent(wamp.ApplicationSession): 
    def onConnect(self): 
     self.join(u"realm1") 

    @asyncio.coroutine 
    def setup_socket(self): 
     # Create a non-blocking socket 
     self.sock = rsock 
     self.sock.setblocking(0) 
     loop = asyncio.get_event_loop() 
     # Wait for connections to come in. When one arrives, 
     # call the time service and disconnect immediately. 
     while True: 
      rcmd = yield from loop.sock_recv(rsock,80) 
      yield from self.call_service(rcmd.decode()) 

    @asyncio.coroutine 
    def onJoin(self, details): 
     # Setup our socket server 
     asyncio.async(self.setup_socket()) 


    @asyncio.coroutine 
    def call_service(self,rcmd): 
     print(rcmd) 
     try: 
      now = yield from self.call(rcmd) 
     except Exception as e: 
      print("Error: {}".format(e)) 
     else: 
      print("Current time from time service: {}".format(now)) 



    def onLeave(self, details): 
     self.disconnect() 

    def onDisconnect(self): 
     asyncio.get_event_loop().stop() 



def start_aloop() : 
    loop = asyncio.new_event_loop() 
    asyncio.set_event_loop(loop) 
    transport_factory = websocket.WampWebSocketClientFactory(session_factory, 
        debug = False, 
        debug_wamp = False) 
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080) 
    loop.run_until_complete(coro) 
    loop.run_forever() 
    loop.close() 

if __name__ == '__main__': 
    session_factory = wamp.ApplicationSessionFactory() 
    session_factory.session = MyFrontendComponent 

    ## 4) now enter the asyncio event loop 
    print('starting thread') 
    thread = threading.Thread(target=start_aloop) 
    thread.start() 
    time.sleep(5) 
    wsock.send(b'com.timeservice.now') 
    time.sleep(5) 
    wsock.send(b'com.timeservice.now') 
    time.sleep(5) 
    wsock.send(b'com.timeservice.now') 
+0

您可以使用'asyncio.sleep'和'loop.socket_sendall'以更多'asyncio'友好的方式实现'wsock'。这样你就不需要后台线程。相反,您只需使用'asyncio.async'来调度协程,该调用称为'asyncio.sleep(5)yield',接着是'loop.sock_sendall(wsock,b'com.timeservice.now')yield'。 – dano 2014-11-03 19:30:00

+0

谢谢,我会继续研究这个,但我只用了几天,但还没有完全达到速度。我将循环放在后台,因为这些代码在pyrorside中使用,前台函数由QML代码调用。可能有更好的方法来做到这一点,我怀疑它会及时变得更清晰。 – morris 2014-11-03 20:23:13

+0

啊,如果你打算把这个代码集成到一个不能插入'asyncio'事件循环的应用程序中,那么你在这里得到的是正确的想法。 – dano 2014-11-03 20:26:22