对于场景 1:
- 在全局命名空间中创建用于存储取消范围和事件的字典(key:
UUID
, val:Tuple[trio.CancelScope, trio.Event]
- 为每个客户端分配唯一的 UUID(客户端唯一的任何内容)
- 让客户端在连接开始时发送 UUID
- 检查字典是否将该 UUID 作为键。如果存在,则取消范围并等待设置事件。
- 现在进行实际传输
对于场景 2:
如果客户端没有明确关闭 websocket,websocket 不知道客户端是否断开连接。因此,我能想到的最好的选择是强制超时并等待客户对每次传输的响应。(这使得这种方法的效率有点低)。
下面是上述想法的演示代码。
客户端代码:
由于我不知道客户端代码是什么样的,所以我只是做了一些客户端来测试您的问题。
这有点bug,但是我没有学过js——请不要太认真地判断客户端代码!
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Websocket test</title>
</head>
<body>
<button id="start">Start connection</button>
<button id="close" disabled>Close connection</button>
<input type="text" id="input_" value="INPUT_YOUR_UUID">
<div id="state">Status: Waiting for connection</div>
<script>
let state = document.getElementById("state")
let start_btn = document.getElementById("start")
let close_btn = document.getElementById("close")
let input_ = document.getElementById("input_")
function sleep(sec) {
state.textContent = `Status: sleeping ${sec} seconds`
return new Promise((func) => setTimeout(func, sec * 1000))
}
function websocket_test() {
return new Promise((resolve, reject) => {
let socket = new WebSocket("ws://127.0.0.1:8000/stream")
socket.onopen = function () {
state.textContent = "Status: Sending UUID - " + input_.value
socket.send(input_.value)
close_btn.disabled = false
close_btn.onclick = function () {socket.close()}
}
socket.onmessage = function (msg) {
state.textContent = "Status: Message Received - " + msg.data
socket.send("Received")
}
socket.onerror = function (error) {
reject(error)
state.textContent = "Status: Error encountered"
}
socket.onclose = function () {
state.textContent = "Status: Connection Stopped"
close_btn.disabled = true
}
})
}
start_btn.onclick = websocket_test
</script>
</body>
</html>
服务器代码:
在之前的测试中,我看到服务器抛出超时,但无法重现它 - 如果对行为有信心,您可能不需要trio.fail_after
和except trio.TooSlowError
部分。
"""
Nursery cancellation demo
"""
import itertools
import trio
import fastapi
import hypercorn
from hypercorn.trio import serve
NURSERY = trio.open_nursery()
GLOBAL_NURSERY_STORAGE = {}
TIMEOUT = 5
router = fastapi.APIRouter()
@router.websocket('/stream')
async def run_task(websocket: fastapi.WebSocket):
# accept and receive UUID
# Replace UUID with anything client-specific
await websocket.accept()
uuid_ = await websocket.receive_text()
print(f"[{uuid_}] CONNECTED")
# check if nursery exist in session, if exists, cancel it and wait it to end.
cancel_scope: trio.CancelScope
event: trio.Event
try:
cancel_scope, event = GLOBAL_NURSERY_STORAGE[uuid_]
except KeyError:
pass
else:
print(f"[{uuid_}] STOPPING NURSERY")
cancel_scope.cancel()
await event.wait()
# create new event, and start new nursery.
cancel_done_event = trio.Event()
async with trio.open_nursery() as nursery:
# save ref
GLOBAL_NURSERY_STORAGE[uuid_] = nursery.cancel_scope, cancel_done_event
try:
for n in itertools.count(0, 1):
nursery.start_soon(task, n, uuid_, websocket)
await trio.sleep(1)
# wait for client response
with trio.fail_after(TIMEOUT):
recv = await websocket.receive_text()
print(f"[{uuid_}] RECEIVED {recv}")
except trio.TooSlowError:
# client possibly left without proper disconnection.
print(f"[{uuid_}] CLIENT TIMEOUT")
except fastapi.websockets.WebSocketDisconnect:
print(f"[{uuid_}] CLIENT DISCONNECTED")
# fire event, and pop reference if any.
print(f"[{uuid_}] NURSERY STOPPED & REFERENCE DROPPED")
cancel_done_event.set()
GLOBAL_NURSERY_STORAGE.pop(uuid_, None)
async def task(text, uuid_, websocket: fastapi.WebSocket):
await websocket.send_text(str(text))
print(f"[{uuid_}] SENT {text}")
if __name__ == '__main__':
cornfig = hypercorn.Config()
# cornfig.bind = "ws://127.0.0.1:8000"
trio.run(serve, router, cornfig)
示例运行输出:
客户

服务器
[2022-01-31 21:23:12 +0900] [17204] [INFO] Running on http://127.0.0.1:8000 (CTRL + C to quit)
[2] CONNECTED < start connection on tab 2
[2] SENT 0
[2] RECEIVED Received
[2] SENT 1
[2] RECEIVED Received
[2] SENT 2
[2] RECEIVED Received
[2] SENT 3
[2] RECEIVED Received
[2] SENT 4
[1] CONNECTED < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 5
[1] RECEIVED Received
[1] SENT 1
...
[2] SENT 18
[1] RECEIVED Received
[1] SENT 14
[2] RECEIVED Received
[2] SENT 19
[1] CLIENT DISCONNECTED < closed connection on tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED < tab 1 nursery terminated
[2] RECEIVED Received
[2] SENT 20
[2] RECEIVED Received
[2] SENT 21
[1] CONNECTED < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 22
[1] RECEIVED Received
...
[2] SENT 26
[1] RECEIVED Received
[1] SENT 5
[2] CLIENT DISCONNECTED < tab 2 closed
[2] NURSERY STOPPED & REFERENCE DROPPED < tab 2 nursery terminated
[1] RECEIVED Received
[1] SENT 6
[1] RECEIVED Received
[1] SENT 7
[1] RECEIVED Received
[1] SENT 8
[1] CONNECTED < start another connection on tab 1 without closing
[1] STOPPING NURSERY < previous connection on tab 1 terminating
[1] NURSERY STOPPED & REFERENCE DROPPED < previous connection on tab 1 terminated
[1] SENT 0
[1] RECEIVED Received
[1] SENT 1
...
[1] RECEIVED Received
[1] SENT 8
[1] CLIENT DISCONNECTED < Refreshed tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED < tab 1 nursery terminated
...