2

场景:我有一个服务于一个简单网站的 sanic 网络服务器。该网站基本上是一个带有 vue 模板支持的 html 中的大型数据表。由于表条目每隔几分钟就会更改一次,因此数据在更改时通过 websocket 传递。大约同时有2000个用户。我试图实现一个发布/订阅架构。

问题:一旦我的 sanic 处理程序返回,我的 websocket 就关闭了。我可以在里面有一个循环来保持处理程序打开。但是保持 2000 个处理程序处于打开状态听起来是个坏主意……而且打开的处理程序的行为也很奇怪。一个线程或一个小线程池应该可以完成这项工作。也许我把 sanic 文档弄错了,需要设计建议。

我尝试过的事情: - 将超时设置增加到足够高 - 在 sanic 中尝试各种其他 websocket 设置 - 让我的客户端 js 返回错误的 onmessage(Javascript websockets 在打开后立即关闭) - 在传递它后将 ws 引用设置为 null

Sanic Webserver 的索引:

@app.route('/')
async def serve_index(request):
    return await file(os.path.join(os.path.dirname(__file__), 'index.html'))

Index.html 的 JS:

var app = new Vue({
    el: '#app',
        data() {
            manydata0: 0,
            manydata1: 0,
            ws: null,
        }
    },
    methods: {
        update: function (json_data) {
            json = JSON.parse(json_data);
            this.manydata0 = json['data0'];
            this.manydata1 = json['data1'];
        }
    },
    created: function () {
        this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
        messages = document.createElement('ul');
        this.ws.onmessage = function (event) {
            console.log("new data")
            app.update(event.data);
        return false;
    };
    document.body.appendChild(messages);
    this.ws.onclose = function (event) {
        console.log("closed :(")
    };

Sanic Webserver 的 Websocket 处理程序(第一版,Sockets 立即死亡):

@app.websocket('/reload')
async def feed(request, ws):
    #time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
    await ws.send(Path(json).read_text()) # serve initial data
    connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates

Sanic Webservers 的 Websocket 处理程序(第 2 版,处理程序阻止其他请求处理程序)

@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
        except Exception as e:
            print("Exception while checking file: ", e)
    # this stops the server to handle other @app.routes like css, fonts, favicon

Sanic Webservers 的 Websocket 处理程序(第 3 版,不必要的 recv())

@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
                await recv() # if the client sends from time to time all is fine
        except Exception as e:
            print("Exception while checking file: ", e)

最后两个代码片段差别不大。我添加了一个 ws.recv() 并从客户端发送一些合适的东西(例如在一个间隔内),然后一切正常。然后发送 css、字体和 favicon。但这不可能是故意的,不是吗?这不应该很好地扩展,对吧?

总而言之,这对我来说没有多大意义。我有什么误解?

4

1 回答 1

2

这里是 Sanic 核心开发人员之一。

首先,对于 pubsub 类型架构的示例,这是我准备的要点。我认为这可能会有所帮助。

我的基本想法是创建一个Feed在自己的任务中循环查找事件的对象。就我而言,它是从 pubsub 接收信息。在您的情况下,它应该检查 JSON 文档上的时间。

然后,当Feed.receiver触发事件时,它会向所有正在侦听的客户端发出信号。

websocket处理程序本身内部,您希望保持打开状态。如果不这样做,则连接将关闭。如果您不关心从客户端接收信息,则不需要使用await recv().


所以,在你的情况下,使用超级简单的逻辑,我会做如下的事情。

这是未经测试的代码,可能需要一些调整

import os
import random
import string
from functools import partial
from pathlib import Path

from sanic import Sanic

import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set

app = Sanic(__name__)

FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20


def generate_code(length=12, include_punctuation=False):
    characters = string.ascii_letters + string.digits
    if include_punctuation:
        characters += string.punctuation
    return "".join(random.choice(characters) for x in range(length))


@dataclass
class Client:
    interface: websockets.server.WebSocketServerProtocol = field(repr=False)
    sid: str = field(default_factory=partial(generate_code, 36))

    def __hash__(self):
        return hash(str(self))

    async def keep_alive(self) -> None:
        while True:
            try:
                try:
                    pong_waiter = await self.interface.ping()
                    await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
                except asyncio.TimeoutError:
                    print("NO PONG!!")
                    await self.feed.unregister(self)
                else:
                    print(f"ping: {self.sid} on <{self.feed.name}>")
                await asyncio.sleep(INTERVAL)
            except websockets.exceptions.ConnectionClosed:
                print(f"broken connection: {self.sid} on <{self.feed.name}>")
                await self.feed.unregister(self)
                break

    async def shutdown(self) -> None:
        self.interface.close()

    async def run(self) -> None:
        try:
            self.feed.app.add_task(self.keep_alive())
            while True:
                pass
        except websockets.exceptions.ConnectionClosed:
            print("connection closed")
        finally:
            await self.feed.unregister(self)


class Feed:
    app: Sanic
    clients: Set[Client]
    cached = None

    def __init__(self, app: Sanic):
        self.clients = set()
        self.app = app

    @classmethod
    async def get(cls, app: Sanic):
        is_existing = False

        if cls.cached:
            is_existing = True
            feed = cls.cached
        else:
            feed = cls(app)
            cls.cached = feed

        if not is_existing:
            feed.app.add_task(feed.receiver())

        return feed, is_existing

    async def receiver(self) -> None:
        print("Feed receiver started")
        mod_time = 0
        while True:
            try:
                stat = os.stat(FILE)
                print(f"times: {mod_time} | {stat.st_mtime}")
                if mod_time != stat.st_mtime:
                    content = self.get_file_contents()
                    for client in self.clients:
                        try:
                            print(f"\tSending to {client.sid}")
                            await client.interface.send(content)
                        except websockets.exceptions.ConnectionClosed:
                            print(f"ConnectionClosed. Client {client.sid}")
            except Exception as e:
                print("Exception while checking file: ", e)

    async def register(
        self, websocket: websockets.server.WebSocketServerProtocol
    ) -> Optional[Client]:
        client = Client(interface=websocket)
        print(f">>> register {client}")

        client.feed = self
        self.clients.add(client)

        # Send initial content
        content = self.get_file_contents()
        client.interface.send(content)

        print(f"\nAll clients\n{self.clients}\n\n")

        return client

    async def unregister(self, client: Client) -> None:
        print(f">>> unregister {client} on <{self.name}>")
        if client in self.clients:
            await client.shutdown()
            self.clients.remove(client)
            print(f"\nAll remaining clients\n{self.clients}\n\n")

    def get_file_contents(self):
        return Path(FILE).read_text()


@app.websocket("/reload")
async def feed(request, ws):
    feed, is_existing = await Feed.get(app)

    client = await feed.register(ws)
    await client.run()


if __name__ == "__main__":
    app.run(debug=True, port=7777)
于 2019-06-19T08:10:01.567 回答