2

我正在尝试使用 crossbar/autobahn 的 RPC 通过 websockets 传输大数据。我的设置如下:

  • 蟒蛇 2.7
  • 交叉开关路由器(版本 17.8.1.post1)
  • 将尝试将大熊猫 DataFrame 作为 json 字符串发送的后端
  • 想要接收此字符串的前端

本质上,我的前端正在尝试调用一个返回大字符串的函数。

class MyComponent(ApplicationSession):

@inlineCallbacks
def onJoin(self, details):
    print("session ready")
    try:
        res = yield self.call(u'data.get')

我得到这个错误:

2017-08-09T16:38:10+0200 session closed with reason wamp.close.transport_lost [WAMP transport was lost without closing the session before]
2017-08-09T16:38:10+0200 Cancelling 1 outstanding requests
2017-08-09T16:38:10+0200 call error: ApplicationError(error=<wamp.close.transport_lost>, args=[u'WAMP transport was lost without closing the session before'], kwargs={}, enc_algo=None)

似乎 crossbar 正在将我踢出去,因为我的客户端会话在他看来已死,但我认为高速公路会将我的数据分块,并且该调用不会阻塞客户端反应器。

我在交叉开关配置中启用了一些东西来改进 websocket 处理;多亏了这一点,我能够传输大量数据,但最终我会遇到限制(配置文件主要从 sam & max 复制和粘贴)。

                        "options": {
                            "enable_webstatus": false,
                            "max_frame_size": 16777216,
                            "auto_fragment_size": 65536,
                            "fail_by_drop": true,
                            "open_handshake_timeout": 2500,
                            "close_handshake_timeout": 1000,
                            "auto_ping_interval": 10000,
                            "auto_ping_timeout": 5000,
                            "auto_ping_size": 4,
                            "compression": {
                                "deflate": {
                                    "request_no_context_takeover": false,
                                    "request_max_window_bits": 11,
                                    "no_context_takeover": false,
                                    "max_window_bits": 11,
                                    "memory_level": 4
                               }
                            }
                        }

任何想法,需要,我做错的事情?

谢谢,


客户端代码:

from __future__ import print_function
import pandas as pd

from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks


class MyComponent(ApplicationSession):

    @inlineCallbacks
    def onJoin(self, details):
        print("session ready")
        try:
            res = yield self.call(u'data.get')
            print('Got the data')
            data = pd.read_json(res)
            print("call result: {}".format(data.head()))
            print("call result shape: {0}, {1}".format(*data.shape))
        except Exception as e:
            print("call error: {0}".format(e))


if __name__ == "__main__":
    from autobahn.twisted.wamp import ApplicationRunner

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
    runner.run(MyComponent)

后端代码

from __future__ import absolute_import, division, print_function

from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet import reactor, defer, threads

# Imports
import pandas as pd


def get_data():
    """Returns a DataFrame of stuff as a JSON

    :return: str, data as a JSON string
    """
    data = pd.DataFrame({
        'col1': pd.np.arange(1000000),
        'col2': "I'm big",
        'col3': 'Like really big',
    })
    print("call result shape: {0}, {1}".format(*data.shape))
    print(data.memory_usage().sum())
    print(data.head())
    return data.to_json()


class MyBackend(ApplicationSession):

    def __init__(self, config):
        ApplicationSession.__init__(self, config)

    @inlineCallbacks
    def onJoin(self, details):

        # Register a procedure for remote calling
        @inlineCallbacks
        def async_daily_price(eqt_list):
            res = yield threads.deferToThread(get_data)
            defer.returnValue(res)

        yield self.register(async_daily_price, u'data.get')


if __name__ == "__main__":
    from autobahn.twisted.wamp import ApplicationRunner

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
    runner.run(MyBackend)

配置

{
"version": 2,
"controller": {},
"workers": [
    {
        "type": "router",
        "realms": [
            {
                "name": "realm1",
                "roles": [
                    {
                        "name": "anonymous",
                        "permissions": [
                            {
                                "uri": "",
                                "match": "prefix",
                                "allow": {
                                    "call": true,
                                    "register": true,
                                    "publish": true,
                                    "subscribe": true
                                },
                                "disclose": {
                                    "caller": false,
                                    "publisher": false
                                },
                                "cache": true
                            }
                        ]
                    }
                ]
            }
        ],
        "transports": [
            {
                "type": "universal",
                "endpoint": {
                    "type": "tcp",
                    "port": 8080
                },
                "rawsocket": {
                },
                "websocket": {
                    "ws": {
                        "type": "websocket",
                        "options": {
                            "enable_webstatus": false,
                            "max_frame_size": 16777216,
                            "auto_fragment_size": 65536,
                            "fail_by_drop": true,
                            "open_handshake_timeout": 2500,
                            "close_handshake_timeout": 1000,
                            "auto_ping_interval": 10000,
                            "auto_ping_timeout": 5000,
                            "auto_ping_size": 4,
                            "compression": {
                                "deflate": {
                                    "request_no_context_takeover": false,
                                    "request_max_window_bits": 11,
                                    "no_context_takeover": false,
                                    "max_window_bits": 11,
                                    "memory_level": 4
                               }
                            }
                        }
                    }
                },
                "web": {
                    "paths": {
                        "/": {
                            "type": "static",
                            }
                        }
                    }
                }
            ]
        }
    ]
}
4

1 回答 1

0

crossbar.io 小组建议的解决方案是使用 RPC 的渐进式结果选项。

完整的工作示例位于https://github.com/crossbario/autobahn-python/tree/master/examples/twisted/wamp/rpc/progress

在我的代码中,我必须在后端添加结果的分块

        step = 10000
        if details.progress and len(res) > step:
            for i in xrange(0, len(res), step):
                details.progress(res[i:i+step])
        else:
            defer.returnValue(res)

和来电者

        res = yield self.call(
            u'data.get'
            options=CallOptions(
                on_progress=partial(on_progress, res=res_list)
            )
        )

我的函数 on_progress 将块添加到结果列表中

def on_progress(x, res):
    res.append(x)

选择正确的块大小就可以了。

于 2017-08-10T16:25:43.333 回答