0

我们正在尝试使用 Tornado 设置一个 websocket,它使用他们自己的 MongoDB 数据库中的更改来更新侦听器。我们想听几个数据库。我们试图不经常收听所有数据库,而只收听订阅者订阅的数据库。我们通过关闭 change_stream 并使用更新的查询自动重新启动它找到了解决方法。但是,我们想知道是否也可以更改活动 change_stream 的查询/管道。有谁知道这是否可行,如果可以,我们该怎么做。

websocket的open&on_close函数:

    def open(self):
        """ Handle new incoming connection """
        global db_names
        print("opened")

        # Fetch the arguments / params from the websocket connection url
        arguments = self.request.arguments

        # Fetch the agent & tenant from the arguments & decode from bytes to utf-8
        self.agent = arguments.get("agent")[0].decode("utf-8")
        self.tenant = arguments.get("tenant")[0].decode("utf-8")

        # If the tenant is not yet
        if self.tenant not in WebSocket.tenant_clients.keys():
            WebSocket.tenant_clients[self.tenant] = set()

            db_names = list(WebSocket.tenant_clients.keys())
            tornado.ioloop.IOLoop.current().add_callback(restart_watcher)

        # Add the client to the related tenant connection set
        WebSocket.tenant_clients[self.tenant].add(self)

    def on_close(self):
        """ Handle connection close """
        global db_names
        print("closed")
        # Remove the client from the tenant connection set
        WebSocket.tenant_clients[self.tenant].remove(self)

        if len(WebSocket.tenant_clients[self.tenant]) == 0:
            WebSocket.tenant_clients.pop(self.tenant)
            db_names = list(WebSocket.tenant_clients.keys())

手表功能

async def watch(collection):
    global change_stream, db_names

    resume_token = None

    while True:
        try:
            print("restarted", db_names)
            pipeline = [
                {"$match":
                    {
                        "$and": [
                            {"operationType": "insert"},
                            {"ns.coll": "test_collection"}
                        ],
                        "$or": [
                            {"ns.db": db_name}
                            for db_name in db_names or DEFAULT_DB_NAME
                        ]
                    }
                }
            ]

            async with collection.watch(pipeline, start_after=resume_token) as change_stream:
                async for change in change_stream:
                    resume_token = change_stream.resume_token
                    WebSocket.send_updates(change)

        except pymongo.errors.PyMongoError as e:
            print("Error in mongo Watch: {}".format(e))
        except Exception as e:
            print("Error in WebSocket class: {}".format(e))

启动 websocket 和 watcher 的主要功能

def main():
    print("starting websocket...")
    MONGO_URL = 'MONGO_URL_GOES_HERE'
    client = MotorClient(MONGO_URL)

    app = tornado.web.Application(
        [(r"/socket", WebSocket)],
        websocket_ping_interval=10
    )

    app.listen(8000)

    loop = tornado.ioloop.IOLoop.current()
    loop.add_callback(watch, client)

    try:
        loop.start()
    except KeyboardInterrupt:
        pass
    finally:
        if change_stream is not None:
            change_stream.close()
4

0 回答 0