3

我将 Autobahn 0.9.2 与 Python 3.4 与 asyncio 一起使用。

问题:使用 WAMP,是否可以从 RPC 端点内部访问充当呼叫者的 IP 和 HTTP 连接标头的对等方?建立连接时是否保留此信息?如果没有,我将如何开始扩展一些工厂来支持这一点?

我的目标很简单:我想要一个 RPC 端点来地理定位连接的对等点(调用者)的 IP,并将增强的数据中继到 Redis。我已经阅读了源代码并且知道信息从哪里通过(autobahn.websocket.protocol.WebSocketServerProtocol -> onConnect(request)),但是我无法从 onJoin 回调中定义的 ApplicationSession 的 RPC 端点向下钻取它。我尝试遍历传输/路由器/路由器会话链并没有设法到达那里。我对初始连接请求中的对等 IP 和 HTTP 标头都感兴趣。

这是蒸馏的组件:

class IncomingComponent(ApplicationSession):

def __init__(self, **params):
    super().__init__()
    self.redis = StrictRedis(host=config["redis"]["host"], port=config["redis"]["port"], db=config["redis"]["databases"]["ailytics"])

def onConnect(self):
    self.join("abc")

@asyncio.coroutine
def onJoin(self, details):

    def geolocalize_and_store_event(event, detail):
        # Geolocalize here! Have access to caller ID through detail
        self.redis.rpush("abc:events", json.dumps(event))

    yield from self.register(
        geolocalize_and_store_event,
        "abc.geolocalize_and_store_event",
        options=RegisterOptions(details_arg='detail', discloseCaller = True)
    )

以及服务器的初始化:

    router_factory = wamp.RouterFactory()

    session_factory = wamp.RouterSessionFactory(router_factory)
    session_factory.add(IncomingComponent())

    transport_factory = websocket.WampWebSocketServerFactory(session_factory, debug=False, debug_wamp=False)

    loop = asyncio.get_event_loop()
    coro = loop.create_server(transport_factory, '0.0.0.0', 7788)
    server = loop.run_until_complete(coro)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        server.close()
        loop.close()
4

2 回答 2

2

wamp.session.get您至少可以通过 crossbar.io 中的元 API 访问额外的会话/传输信息:

@inlineCallbacks
def onJoin(self, ign):

    @inlineCallbacks
    def method(details):
        session = yield self.call('wamp.session.get', details.caller)
        peer = session['transport']['peer']
        print "peer's address", peer

        headers = session['transport']['http_headers_received']
        print "headers:"
        print '\n'.join(['{}: {}'.format(k, v) for (k, v) in headers.items()])

    yield self.register(
        method, 'some_method',
        types.RegisterOptions(details_arg='details'),
    )
于 2015-11-26T21:05:17.667 回答
1

如@oberstet 所述,该功能尚不存在,但由于使用工厂模式的高速公路/WAMP 的东西,我能够在不更改库代码的情况下提出解决方案。

子类有 3 个组件:

首先,我们在 wamp.RouterSession 的子类中添加一个 ipAddress 实例变量

class IncomingServerSession(wamp.RouterSession):

    def __init__(self, routerFactory):
        super().__init__(routerFactory)
        self.ipAddress = None

然后我们让 wamp.RouterSessionFactory 子类使用 IncomingServerSession

class IncomingServerSessionFactory(wamp.RouterSessionFactory):
    session = IncomingServerSession

最后,我们继承 websocket.WampWebSocketServerProtocol 并设置 ipAddress 实例变量。由于我们在 onOpen 回调中,我们可以访问对等点和 HTTP 标头。我的服务器是反向代理的,所以我正在寻找对等方的自定义 HTTP 标头。

class IncomingServerProtocol(websocket.WampWebSocketServerProtocol):
    def onOpen(self):
        try:
            self._session = self.factory._factory()

            # Use your own header or just the peer if not reverse-proxied
            self._session.ipAddress = (self.http_headers.get('x-real-ip') or self.peer) 

            self._session.onOpen(self)

        except Exception as e:
            if self.factory.debug_wamp:
                traceback.print_exc()
                # # Exceptions raised in onOpen are fatal ..
                reason = "WAMP Internal Error ({})".format(e)
                self._bailout(protocol.WebSocketProtocol.CLOSE_STATUS_CODE_INTERNAL_ERROR, reason=reason)

以下是我们在 RPC 调用中访问对等方 IP 的方式:

@asyncio.coroutine
def onJoin(self, details):
    def event(e, details):
        caller_session_id = details.caller
        caller_session = self._transport._router._dealer._session_id_to_session[caller_session_id]

        print(caller_session.ipAddress)

    #discloseCaller needs to be True
    yield from self.register(event, "abc.event", options=RegisterOptions(details_arg='details', discloseCaller=True))

最后,我们需要更新初始化代码以使用我们的子类:

router_factory = wamp.RouterFactory()

session_factory = IncomingServerSessionFactory(router_factory)
session_factory.add(IncomingComponent())

transport_factory = websocket.WampWebSocketServerFactory(session_factory, debug=False, debug_wamp=False)
transport_factory.protocol = IncomingServerProtocol

loop = asyncio.get_event_loop()
coro = loop.create_server(transport_factory, '0.0.0.0', 7788)
server = loop.run_until_complete(coro)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    server.close()
    loop.close()

在获得官方支持之前,您就是这样做的!

于 2014-11-04T22:17:37.893 回答