2015-02-09 56 views
3

我还没有找到任何解决方案来解决我的问题。我需要用两个线程创建一个python应用程序,每个线程都使用高速公路库连接到WAMP路由器。如何用两个线程创建一个具有高速应用程序的Python应用程序

关注我写我的实验代码:

wampAddress = 'ws://172.17.3.139:8181/ws' 
wampRealm = 's4t' 

from threading import Thread 
from autobahn.twisted.wamp import ApplicationRunner 
from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet.defer import inlineCallbacks 


class AutobahnMRS(ApplicationSession): 
    @inlineCallbacks 
    def onJoin(self, details): 
     print("Sessio attached [Connect to WAMP Router]") 

     def onMessage(*args): 
      print args 
     try: 
      yield self.subscribe(onMessage, 'test') 
      print ("Subscribed to topic: test") 

     except Exception as e: 
      print("Exception:" +e) 

class AutobahnIM(ApplicationSession): 
    @inlineCallbacks 
    def onJoin(self, details): 
     print("Sessio attached [Connect to WAMP Router]") 

     try: 
      yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO') 
      print ("Subscribed to topic: test") 

     except Exception as e: 
      print("Exception:" +e) 

class ManageRemoteSystem: 
    def __init__(self): 
     self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm) 

    def start(self): 
     self.runner.run(AutobahnMRS); 


class InternalMessages: 
    def __init__(self): 
     self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm) 

    def start(self): 
     self.runner.run(AutobahnIM); 

#class S4tServer: 

if __name__ == '__main__': 
    server = ManageRemoteSystem() 
    sendMessage = InternalMessages() 

    thread1 = Thread(target = server.start()) 
    thread1.start() 
    thread1.join() 

    thread2 = Thread(target = sendMessage.start()) 
    thread2.start() 
    thread2.join() 

当我只启动了线程1时开始,后来当我杀的应用程序(CTRL-C)以下错误消息显示这条巨蟒的应用:

Sessio attached [Connect to WAMP Router] 
Subscribed to topic: test 
^CTraceback (most recent call last): 
    File "test_pub.py", line 71, in <module> 
    p2 = multiprocessing.Process(target = server.start()) 
    File "test_pub.py", line 50, in start 
    self.runner.run(AutobahnMRS); 
    File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run 
    reactor.run() 
    File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run 
    self.startRunning(installSignalHandlers=installSignalHandlers) 
    File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning 
    ReactorBase.startRunning(self) 
    File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning 
    raise error.ReactorNotRestartable() 
twisted.internet.error.ReactorNotRestartable 

我需要有它的功能一个应用程序来实现,也必须有传达给WAMP路由器高速公路Python库的系统。

换句话说,我需要一个能够与WAMP路由器通信的解决方案,但同时这个应用程序不必被高速公路部分阻塞(我认为解决方案是启动两个线程,一个线程管理一些功能,第二个线程管理高速公路部分)。

由于我之前提出的模式存在另一个问题,需要在'no autobahn thread'中的应用程序部分发送消息(在WAMP路由器的特定主题中),应该通过具有特定功能而不会阻碍其他功能。

我希望我已经给出了所有的细节。

非常感谢您的回复

--------------------------------编辑--- ------------------------------

经过一番研究,我已经实现了我需要的websocket协议,代码是如下:

# ----- twisted ---------- 
class _WebSocketClientProtocol(WebSocketClientProtocol): 
    def __init__(self, factory): 
     self.factory = factory 

    def onOpen(self): 
     #log.debug("Client connected") 
     self.factory.protocol_instance = self 
     self.factory.base_client._connected_event.set() 

class _WebSocketClientFactory(WebSocketClientFactory): 
    def __init__(self, *args, **kwargs): 
     WebSocketClientFactory.__init__(self, *args, **kwargs) 
     self.protocol_instance = None 
     self.base_client = None 

    def buildProtocol(self, addr): 
     return _WebSocketClientProtocol(self) 
# ------ end twisted ------- 
lass BaseWBClient(object): 

    def __init__(self, websocket_settings): 
     #self.settings = websocket_settings 
     # instance to be set by the own factory 
     self.factory = None 
     # this event will be triggered on onOpen() 
     self._connected_event = threading.Event() 
     # queue to hold not yet dispatched messages 
     self._send_queue = Queue.Queue() 
     self._reactor_thread = None 

    def connect(self): 

     log.msg("Connecting to host:port") 
     self.factory = _WebSocketClientFactory(
           "ws://host:port", 
           debug=True) 
     self.factory.base_client = self 

     c = connectWS(self.factory) 

     self._reactor_thread = threading.Thread(target=reactor.run, 
               args=(False,)) 
     self._reactor_thread.daemon = True 
     self._reactor_thread.start() 

    def send_message(self, body): 
     if not self._check_connection(): 
      return 
     log.msg("Queing send") 
     self._send_queue.put(body) 
     reactor.callFromThread(self._dispatch) 

    def _check_connection(self): 
     if not self._connected_event.wait(timeout=10): 
      log.err("Unable to connect to server") 
      self.close() 
      return False 
     return True 

    def _dispatch(self): 
     log.msg("Dispatching") 
     while True: 
      try: 
       body = self._send_queue.get(block=False) 
      except Queue.Empty: 
       break 
      self.factory.protocol_instance.sendMessage(body) 

    def close(self): 
     reactor.callFromThread(reactor.stop) 

import time 
def Ppippo(coda): 
     while True: 
      coda.send_message('YOOOOOOOO') 
      time.sleep(5) 

if __name__ == '__main__': 

    ws_setting = {'host':'', 'port':} 

    client = BaseWBClient(ws_setting) 

    t1 = threading.Thread(client.connect()) 
    t11 = threading.Thread(Ppippo(client)) 
    t11.start() 
    t1.start() 

以前的代码工作正常,但我需要翻译此操作WAMP协议insted websocket。

有谁知道我如何解决?

+0

移动'thread1.join()'下拉至与'thread2.join() '。在当前位置,它会告诉主线程等待,直到线程1死亡。由于你无法杀死线程(不用Ctrl-C杀死整个进程),所以第二个线程永远不会被创建。 – 2015-02-09 16:51:04

+0

另外,你的线程应该在线程的'.run()'函数中完成他们的工作。你在主函数的末尾加入()一个线程,以允许线程在主应用程序退出之前完成执行。所以你需要让线程完成任务。 – 2015-02-09 16:56:44

+0

你需要2个应用程序会话,还是真的有2个线程?如果前者是同一个路由器/领域或不同的路由器?如果是后者,为什么你首先需要线程?如果你需要做CPU密集的东西,并希望使用多核心,请让我们知道。需要更多的“为什么”和“什么”... – oberstet 2015-02-09 19:54:32

回答

5

坏消息是Autobahn正在使用Twisted主循环,所以你不能一次在两个线程中运行它。

好消息是,你不需要需要在两个线程中运行它来运行两件事情,而且无论如何两个线程会更复杂。

开始使用多个应用程序的API有点令人困惑,因为您有两个ApplicationRunner对象,乍看之下,您在高速公路上运行应用程序的方式是致电ApplicationRunner.run

但是,ApplicationRunner只是一种方便,它包装了设置应用程序的东西以及运行主循环的东西;工作的真正肉发生在WampWebSocketClientFactory

为了实现你想要的,你只需要摆脱线程,并自己运行主循环,使实例简单地设置他们的应用程序。

为了实现这一点,你需要改变你的程序做这个的最后一部分:

class ManageRemoteSystem: 
    def __init__(self): 
     self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm) 

    def start(self): 
     # Pass start_reactor=False to all runner.run() calls 
     self.runner.run(AutobahnMRS, start_reactor=False) 


class InternalMessages: 
    def __init__(self): 
     self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm) 

    def start(self): 
     # Same as above 
     self.runner.run(AutobahnIM, start_reactor=False) 


if __name__ == '__main__': 
    server = ManageRemoteSystem() 
    sendMessage = InternalMessages() 
    server.start() 
    sendMessage.start() 

    from twisted.internet import reactor 
    reactor.run() 
+1

这个API有点吸引人,尤其是这个用例。我们在另一个repo中有未发布的东西,它允许通过单个调用启动多个会话,并返回一个“DeferredList”(可以用单独的WAMP应用会话来解决)。可能这应该在Autobahn .. – oberstet 2015-02-09 19:55:48

+0

对不起,能不能更准确地说明这一点 – alotronto 2015-02-23 15:30:18

+1

谢谢@Glyph !!!我一直在寻找这个'start_reactor'参数的年龄,但似乎在文档中没有提及它......或者其他人应该如何将autobahn添加到现有的Twisted应用程序中? – jjmontes 2015-09-10 21:37:00

相关问题