2017-08-09 43 views
1

我想通过使用crossbar/autobahn的RPC的websockets传输大数据。我的设置是如下:从RPC(Crossbar + Autobahn | Python)返回大数据

  • 的Python 2.7
  • 横杆路由器(17.8.1.post1版)
  • 后端会试图发送一个大的熊猫数据帧作为一个JSON字符串
  • 想要接收此字符串的前端

实质上,我的前端尝试调用一个将返回大字符串的函数。

class MyComponent(ApplicationSession): 

@inlineCallbacks 
def onJoin(self, details): 
    print("session ready") 
    try: 
     res = yield self.call(u'data.get') 

而且我得到这个错误:

2017-08-09T16:38:10+0200 session closed with reason wamp.close.transport_lost [WAMP transport was lost without closing the session before] 
2017-08-09T16:38:10+0200 Cancelling 1 outstanding requests 
2017-08-09T16:38:10+0200 call error: ApplicationError(error=<wamp.close.transport_lost>, args=[u'WAMP transport was lost without closing the session before'], kwargs={}, enc_algo=None) 

看来横杆踢我,因为我的客户端会话看起来死他,但我认为,高速公路将大块我的数据和呼叫会不阻止客户端反应器。

我在crossbar配置中启用了一些改进websocket处理的东西;多亏了我能够传输更多的数据,但最终我会达到一个极限(配置文件主要是复制并粘贴最大sam &)。

     "options": { 
          "enable_webstatus": false, 
          "max_frame_size": 16777216, 
          "auto_fragment_size": 65536, 
          "fail_by_drop": true, 
          "open_handshake_timeout": 2500, 
          "close_handshake_timeout": 1000, 
          "auto_ping_interval": 10000, 
          "auto_ping_timeout": 5000, 
          "auto_ping_size": 4, 
          "compression": { 
           "deflate": { 
            "request_no_context_takeover": false, 
            "request_max_window_bits": 11, 
            "no_context_takeover": false, 
            "max_window_bits": 11, 
            "memory_level": 4 
           } 
          } 
         } 

任何想法,需要,我做错了什么?

谢谢


客户端代码:

from __future__ import print_function 
import pandas as pd 

from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet.defer import inlineCallbacks 


class MyComponent(ApplicationSession): 

    @inlineCallbacks 
    def onJoin(self, details): 
     print("session ready") 
     try: 
      res = yield self.call(u'data.get') 
      print('Got the data') 
      data = pd.read_json(res) 
      print("call result: {}".format(data.head())) 
      print("call result shape: {0}, {1}".format(*data.shape)) 
     except Exception as e: 
      print("call error: {0}".format(e)) 


if __name__ == "__main__": 
    from autobahn.twisted.wamp import ApplicationRunner 

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1") 
    runner.run(MyComponent) 

后端代码

from __future__ import absolute_import, division, print_function 

from twisted.internet.defer import inlineCallbacks 
from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet import reactor, defer, threads 

# Imports 
import pandas as pd 


def get_data(): 
    """Returns a DataFrame of stuff as a JSON 

    :return: str, data as a JSON string 
    """ 
    data = pd.DataFrame({ 
     'col1': pd.np.arange(1000000), 
     'col2': "I'm big", 
     'col3': 'Like really big', 
    }) 
    print("call result shape: {0}, {1}".format(*data.shape)) 
    print(data.memory_usage().sum()) 
    print(data.head()) 
    return data.to_json() 


class MyBackend(ApplicationSession): 

    def __init__(self, config): 
     ApplicationSession.__init__(self, config) 

    @inlineCallbacks 
    def onJoin(self, details): 

     # Register a procedure for remote calling 
     @inlineCallbacks 
     def async_daily_price(eqt_list): 
      res = yield threads.deferToThread(get_data) 
      defer.returnValue(res) 

     yield self.register(async_daily_price, u'data.get') 


if __name__ == "__main__": 
    from autobahn.twisted.wamp import ApplicationRunner 

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1") 
    runner.run(MyBackend) 

配置

{ 
"version": 2, 
"controller": {}, 
"workers": [ 
    { 
     "type": "router", 
     "realms": [ 
      { 
       "name": "realm1", 
       "roles": [ 
        { 
         "name": "anonymous", 
         "permissions": [ 
          { 
           "uri": "", 
           "match": "prefix", 
           "allow": { 
            "call": true, 
            "register": true, 
            "publish": true, 
            "subscribe": true 
           }, 
           "disclose": { 
            "caller": false, 
            "publisher": false 
           }, 
           "cache": true 
          } 
         ] 
        } 
       ] 
      } 
     ], 
     "transports": [ 
      { 
       "type": "universal", 
       "endpoint": { 
        "type": "tcp", 
        "port": 8080 
       }, 
       "rawsocket": { 
       }, 
       "websocket": { 
        "ws": { 
         "type": "websocket", 
         "options": { 
          "enable_webstatus": false, 
          "max_frame_size": 16777216, 
          "auto_fragment_size": 65536, 
          "fail_by_drop": true, 
          "open_handshake_timeout": 2500, 
          "close_handshake_timeout": 1000, 
          "auto_ping_interval": 10000, 
          "auto_ping_timeout": 5000, 
          "auto_ping_size": 4, 
          "compression": { 
           "deflate": { 
            "request_no_context_takeover": false, 
            "request_max_window_bits": 11, 
            "no_context_takeover": false, 
            "max_window_bits": 11, 
            "memory_level": 4 
           } 
          } 
         } 
        } 
       }, 
       "web": { 
        "paths": { 
         "/": { 
          "type": "static", 
          } 
         } 
        } 
       } 
      ] 
     } 
    ] 
} 

回答

0

crossbar.io组建议的解决方案是使用RPC的渐进结果选项。

一个完整的工作示例位于https://github.com/crossbario/autobahn-python/tree/master/examples/twisted/wamp/rpc/progress

在我的代码我不得不添加的结果的分块在后端

 step = 10000 
     if details.progress and len(res) > step: 
      for i in xrange(0, len(res), step): 
       details.progress(res[i:i+step]) 
     else: 
      defer.returnValue(res) 

而且给调用者

 res = yield self.call(
      u'data.get' 
      options=CallOptions(
       on_progress=partial(on_progress, res=res_list) 
      ) 
     ) 

我的函数on_progress将结果添加到结果列表中

def on_progress(x, res): 
    res.append(x) 

选择正确的块大小将做到这一点。