18

我目前正在使用aiohttp来查看它如何作为具有 websocket 连接的移动应用程序的服务器应用程序执行。

这是简单的“Hello world”示例(此处为要点):

import asyncio
import aiohttp
from aiohttp import web


class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        print('Connection opened')
        try:
            while True:
                msg = yield from ws.receive()
                ws.send_str(msg.data + '/answer')
        except:
            pass
        finally:
            print('Connection closed')
        return ws


if __name__ == "__main__":
    app = aiohttp.web.Application()
    app.router.add_route('GET', '/ws', WebsocketEchoHandler())

    loop = asyncio.get_event_loop()
    handler = app.make_handler()

    f = loop.create_server(
        handler,
        '127.0.0.1',
        8080,
    )

    srv = loop.run_until_complete(f)
    print("Server started at {sock[0]}:{sock[1]}".format(
        sock=srv.sockets[0].getsockname()
    ))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(handler.finish_connections(1.0))
        srv.close()
        loop.run_until_complete(srv.wait_closed())
        loop.run_until_complete(app.finish())
    loop.close()

问题

现在我想使用下面描述的结构(节点服务器 = python aiohttp)。更具体地说,使用带有asyncio-redis的Redis Pub/Sub机制在我的WebsocketEchoHandler中读取和写入 websocket 连接和 Redis 。

WebsocketEchoHandler是一个简单的循环,所以我不知道应该怎么做。使用Tornadobrükva我只会使用回调。

http://goldfirestudios.com/blog/136/Horizo​​ntally-Scaling-Node.js-and-WebSockets-with-Redis

额外的(也许是离题的)问题

由于我已经在使用Redis,我应该采用以下两种方法中的哪一种:

  1. 就像在“经典”网络应用程序中一样,对所有内容都有一个控制器/视图,将Redis用于消息传递等。
  2. Web 应用程序应该只是客户端和Redis之间的一层,也用作任务队列(最简单的Python RQ)。每个请求都应该委托给工人。

编辑

图片来自http://goldfirestudios.com/blog/136/Horizo​​ntally-Scaling-Node.js-and-WebSockets-with-Redis

编辑 2

看来我需要澄清一下。

  • Websocket-only 处理程序如上所示
  • Redis Pub/Sub 处理程序可能如下所示:

    class WebsocketEchoHandler:
    
        @asyncio.coroutine
        def __call__(self, request):
            ws = web.WebSocketResponse()
            ws.start(request)
    
            connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
            subscriber = yield from connection.start_subscribe()
            yield from subscriber.subscribe(['ch1', 'ch2'])
    
            print('Connection opened')
            try:
                while True:
                    msg = yield from subscriber.next_published()
                    ws.send_str(msg.value + '/answer')
            except:
                pass
            finally:
                print('Connection closed')
            return ws
    

    此处理程序仅订阅 Redis 通道ch1ch2,并将从这些通道接收到的每个消息发送到 websocket。

  • 我想要这个处理程序:

    class WebsocketEchoHandler:
    
        @asyncio.coroutine
        def __call__(self, request):
            ws = web.WebSocketResponse()
            ws.start(request)
    
            connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
            subscriber = yield from connection.start_subscribe()
            yield from subscriber.subscribe(['ch1', 'ch2'])
    
            print('Connection opened')
            try:
                while True:
                    # If message recived from redis OR from websocket
                    msg_ws = yield from ws.receive()
                    msg_redis = yield from subscriber.next_published()
                    if msg_ws:
                        # push to redis / do something else
                        self.on_msg_from_ws(msg_ws)
                    if msg_redis:
                        self.on_msg_from_redis(msg_redis)
            except:
                pass
            finally:
                print('Connection closed')
            return ws
    

    但是以下代码总是按顺序调用,因此从 websocket 读取会阻止从 Redis 读取:

    msg_ws = yield from ws.receive()
    msg_redis = yield from subscriber.next_published()
    

我希望在事件是从两个来源之一收到消息的事件上完成阅读。

4

2 回答 2

24

您应该使用两个while循环 - 一个处理来自 websocket 的消息,另一个处理来自 redis 的消息。您的主处理程序可以启动两个协程,一个处理每个循环,然后等待它们

class WebsocketEchoHandler:
    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
        subscriber = yield from connection.start_subscribe()
        yield from subscriber.subscribe(['ch1', 'ch2'])

        print('Connection opened')
        try:
            # Kick off both coroutines in parallel, and then block
            # until both are completed.
            yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
        except Exception as e:  # Don't do except: pass
            import traceback
            traceback.print_exc()
        finally:
            print('Connection closed')
        return ws

    @asyncio.coroutine
    def handle_ws(self, ws):
        while True:
            msg_ws = yield from ws.receive()
            if msg_ws:
                self.on_msg_from_ws(msg_ws)

    @asyncio.coroutine
    def handle_redis(self, subscriber):
        while True:
            msg_redis = yield from subscriber.next_published()
            if msg_redis:
                self.on_msg_from_redis(msg_redis)

通过这种方式,您可以从两个潜在来源中的任何一个进行阅读,而不必关心另一个。

于 2015-07-28T18:46:57.550 回答
1

最近我们可以在 python 3.5 及更高版本中使用 async await ..

async def task1(ws):
    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            data = msg.data
            print(data)
            if data:
                await ws.send_str('pong')
## ch is a redis channel
async def task2(ch):
    async for msg in ch1.iter(encoding="utf-8", decoder=json.loads):
        print("receving", msg)
        user_token = msg['token']
        if user_token in r_cons.keys():
            _ws = r_cons[user_token]
            await  _ws.send_json(msg)

coroutines = list()
coroutines.append(task1(ws))
coroutines.append(task2(ch1))

await asyncio.gather(*coroutines)

这就是我所做的。当 websockets 需要等待来自多源的消息时。

这里的要点是使用 asyncio.gather 像@dano 提到的那样一起运行两个 corotine。

于 2018-05-22T06:47:54.177 回答