我们正在尝试使用 Tornado 设置一个 websocket,它使用他们自己的 MongoDB 数据库中的更改来更新侦听器。我们想听几个数据库。我们试图不经常收听所有数据库,而只收听订阅者订阅的数据库。我们通过关闭 change_stream 并使用更新的查询自动重新启动它找到了解决方法。但是,我们想知道是否也可以更改活动 change_stream 的查询/管道。有谁知道这是否可行,如果可以,我们该怎么做。
websocket的open&on_close函数:
def open(self):
""" Handle new incoming connection """
global db_names
print("opened")
# Fetch the arguments / params from the websocket connection url
arguments = self.request.arguments
# Fetch the agent & tenant from the arguments & decode from bytes to utf-8
self.agent = arguments.get("agent")[0].decode("utf-8")
self.tenant = arguments.get("tenant")[0].decode("utf-8")
# If the tenant is not yet
if self.tenant not in WebSocket.tenant_clients.keys():
WebSocket.tenant_clients[self.tenant] = set()
db_names = list(WebSocket.tenant_clients.keys())
tornado.ioloop.IOLoop.current().add_callback(restart_watcher)
# Add the client to the related tenant connection set
WebSocket.tenant_clients[self.tenant].add(self)
def on_close(self):
""" Handle connection close """
global db_names
print("closed")
# Remove the client from the tenant connection set
WebSocket.tenant_clients[self.tenant].remove(self)
if len(WebSocket.tenant_clients[self.tenant]) == 0:
WebSocket.tenant_clients.pop(self.tenant)
db_names = list(WebSocket.tenant_clients.keys())
手表功能
async def watch(collection):
global change_stream, db_names
resume_token = None
while True:
try:
print("restarted", db_names)
pipeline = [
{"$match":
{
"$and": [
{"operationType": "insert"},
{"ns.coll": "test_collection"}
],
"$or": [
{"ns.db": db_name}
for db_name in db_names or DEFAULT_DB_NAME
]
}
}
]
async with collection.watch(pipeline, start_after=resume_token) as change_stream:
async for change in change_stream:
resume_token = change_stream.resume_token
WebSocket.send_updates(change)
except pymongo.errors.PyMongoError as e:
print("Error in mongo Watch: {}".format(e))
except Exception as e:
print("Error in WebSocket class: {}".format(e))
启动 websocket 和 watcher 的主要功能
def main():
print("starting websocket...")
MONGO_URL = 'MONGO_URL_GOES_HERE'
client = MotorClient(MONGO_URL)
app = tornado.web.Application(
[(r"/socket", WebSocket)],
websocket_ping_interval=10
)
app.listen(8000)
loop = tornado.ioloop.IOLoop.current()
loop.add_callback(watch, client)
try:
loop.start()
except KeyboardInterrupt:
pass
finally:
if change_stream is not None:
change_stream.close()