5

aiohttp内置了对websockets的支持。它非常简单并且效果很好。

文档中示例的简化版本是:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    # Async iterate the messages the client sends
    async for message in ws:
        ws.send_str('You sent: %s' % (message.data,))

    print('websocket connection closed')

在示例中,ws是对与客户端的 websocket 连接的引用。我可以轻松地将这些引用放入request.app,就像@Crandel 在这里所做的那样(即全局状态),但不是在生产应用程序中,因为每个应用程序服务器(甚至每个工作人员)都会有自己的app实例。

这有一个公认的模式吗?还有其他方法吗?

注意:我不是指会话。我指的是连接。当服务器 B 的应用程序代码中发生事件时,我想向连接到服务器 A 的客户端发送消息等。

4

3 回答 3

4

如果我对您的理解正确,您希望拥有多个 websocket 服务器,每个服务器都连接多个客户端,但您希望能够与所有连接的客户端进行潜在通信。

这是一个创建三个普通服务器的示例——一个大写回显、一个随机报价和一天中的时间——然后向所有连接的客户端发送一条广播消息。也许这有一些有用的想法。

粘贴箱:https ://pastebin.com/xDSACmdV

#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.

In response to stackoverflow question:
https://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers

Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser

import aiohttp
from aiohttp import web

__author__ = "Robert Harder"
__email__ = "rob@iharder.net"
__license__ = "Public Domain"


def main():
    # Create servers
    cap_srv = CapitalizeEchoServer(port=9990)
    rnd_srv = RandomQuoteServer(port=9991)
    tim_srv = TimeOfDayServer(port=9992)

    # Queue their start operation
    loop = asyncio.get_event_loop()
    loop.create_task(cap_srv.start())
    loop.create_task(rnd_srv.start())
    loop.create_task(tim_srv.start())

    # Open web pages to test them
    webtests = [9990, 9991, 9991, 9992, 9992]
    for port in webtests:
        url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
        webbrowser.open(url)
    print("Be sure to click 'Connect' on the webpages that just opened.")

    # Queue a simulated broadcast-to-all message
    def _alert_all(msg):
        print("Sending alert:", msg)
        msg_dict = {"alert": msg}
        cap_srv.broadcast_message(msg_dict)
        rnd_srv.broadcast_message(msg_dict)
        tim_srv.broadcast_message(msg_dict)

    loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")

    # Run event loop
    loop.run_forever()


class MyServer:
    def __init__(self, port):
        self.port = port  # type: int
        self.loop = None  # type: asyncio.AbstractEventLoop
        self.app = None  # type: web.Application
        self.srv = None  # type: asyncio.base_events.Server

    async def start(self):
        self.loop = asyncio.get_event_loop()
        self.app = web.Application()
        self.app["websockets"] = []  # type: [web.WebSocketResponse]
        self.app.router.add_get("/", self._websocket_handler)
        await self.app.startup()
        handler = self.app.make_handler()
        self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
        print("{} listening on port {}".format(self.__class__.__name__, self.port))

    async def close(self):
        assert self.loop is asyncio.get_event_loop()
        self.srv.close()
        await self.srv.wait_closed()

        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')

        await self.app.shutdown()
        await self.app.cleanup()

    async def _websocket_handler(self, request):
        assert self.loop is asyncio.get_event_loop()
        ws = web.WebSocketResponse()
        await ws.prepare(request)
        self.app["websockets"].append(ws)

        await self.do_websocket(ws)

        self.app["websockets"].remove(ws)
        return ws

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            pass

    def broadcast_message(self, msg: dict):
        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            ws.send_json(msg)


class CapitalizeEchoServer(MyServer):
    """ Echoes back to client whatever they sent, but capitalized. """

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            cap = ws_msg.data.upper()
            ws.send_str(cap)


class RandomQuoteServer(MyServer):
    """ Sends a random quote to the client every so many seconds. """
    QUOTES = ["Wherever you go, there you are.",
              "80% of all statistics are made up.",
              "If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]

    def __init__(self, interval: float = 10, *kargs, **kwargs):
        super().__init__(*kargs, **kwargs)
        self.interval = interval

    async def do_websocket(self, ws: web.WebSocketResponse):
        async def _regular_interval():
            while self.srv.sockets is not None:
                quote = random.choice(RandomQuoteServer.QUOTES)
                ws.send_json({"quote": quote})
                await asyncio.sleep(self.interval)

        self.loop.create_task(_regular_interval())

        await super().do_websocket(ws)  # leave client connected here indefinitely


class TimeOfDayServer(MyServer):
    """ Sends a message to all clients simultaneously about time of day. """

    async def start(self):
        await super().start()

        async def _regular_interval():
            while self.srv.sockets is not None:
                if int(time.time()) % 10 == 0:  # Only on the 10 second mark
                    timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
                    self.broadcast_message({"timestamp": timestamp})
                await asyncio.sleep(1)

        self.loop.create_task(_regular_interval())


if __name__ == "__main__":
    main()
于 2017-02-15T15:22:54.247 回答
3

所以我只熟悉 Node 中的 Socket.IO,但使用 Socket.IO 水平扩展 websocket 相当容易。

套接字可以随会话一起提供,因此每个会话都由特定的服务器管理。这样可以轻松保存每个打开的套接字的状态,并在所有服务器之间进行负载平衡。

这是 Python 的 SocketIO:

https://pypi.python.org/pypi/socketIO-client

这是关于如何将会话附加到 redis-store 以使其更快和更易于管理的跨服务器负载平衡的非常好的读物。

如何与 Socket.IO 1.x 和 Express 4.x 共享会话?

我知道这不能回答你关于 aiohttp 的问题,但希望这能让你更好地了解套接字是如何工作的。

编辑:写在节点-

在 Socket.IO 中,这真的很简单,它有很多函数可以以各种不同的方式广播消息。

对于您的示例,如果您想向每个聊天室中的每个人发送消息。示例每个打开套接字的人都可以轻松编写。

socket.broadcast.emit('WARNING', "this is a test");

假设您有开放的房间,您可以使用一个名为 的简单函数仅向该房间中的人广播消息.to()。示例我有一个名为“BBQ”的房间:

socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');

这将向频道烧烤中的每个人发送消息 - 来吃点东西吧!

编辑:编辑:

这是一篇关于 Socket.IO 工作原理的精彩文章,请确保您阅读了函数更新版本的第二个答案。它比他们的文档更容易阅读。

向除发件人(Socket.io)以外的所有客户端发送响应

据我所知,这也是它在 python 实现中的工作方式。为了便于使用,我肯定会将它用于 websockets。aiohttp 看起来非常强大,但要么没有这个功能,要么隐藏在文档中,要么只写在没有任何文档的代码中。

于 2016-03-05T22:47:03.820 回答
1

更新(2017 年 2 月)

Channels(幸运的是)没有合并到 Django 中。它可能仍然是一个伟大的项目,但它并不真正属于 Django。

另外,我强烈建议您查看 Postgres 相对较新的对 pub/sub 的内置支持。它可能会优于其他任何东西,并且在 aiohttp 上构建一个自定义解决方案,使用 Postgres 作为支持服务,可能是你最好的选择。

原来的

虽然不是aiohttp,但很可能会合并到Django 1.10中的Django Channels以非常直观的方式解决了这个问题,它是由Django migrations的作者Andrew Godwin编写的。

Django Channels 通过在 Django 应用程序前面创建路由层来抽象“许多服务器上的许多进程”的概念。该路由层与后端(例如,Redis)对话以维持进程之间的可共享状态,并使用新的ASGI协议来促进处理 HTTP 请求和 WebSocket,同时将每个请求委托给各自的“消费者”(例如,附带HTTP 请求的内置处理程序,您可以为 WebSockets 编写自己的处理程序)。

Django Channels 有一个名为Groups的概念,它处理问题的“广播”性质;也就是说,它允许服务器上发生的事件向该组中的客户端触发消息,无论它们是连接到相同还是不同的进程或服务器。

恕我直言,Django Channels 很可能被抽象为更通用的 Python 库。还有一些其他 Python 库可以实现类似 Go 的 Channels,但在撰写本文时,没有什么值得注意的可以提供网络透明性。通道在进程和服务器之间进行通信的能力。

于 2016-03-11T17:01:40.437 回答