下面给出了使用 node.js 的答案。
但是,如何在 python Flask 中做同样的事情?
请务必注意,您还需要关闭客户端上的连接,否则它将在retry
超时后尝试重新打开连接。
这让我很困惑,直到我在这里看到这篇文章:https ://stackoverflow.com/a/38235218/5180047
从链接:
这里的问题是服务器意外关闭了连接,而不是在保持打开状态时进行工作。发生这种情况时,客户端会重新发送请求以打开连接并开始流式传输服务器发送的事件。然后服务器一次又一次地关闭连接,导致无限循环。
在我看到这个之后.. 我首选的解决方案是通过产生预期的数据消息来关闭与服务器的连接。
在客户端(浏览器)上
var sse = new EventSource('/sse');
sse.addEventListener('message', function(e) {
console.log(e);
var data = e.data;
if (!data) {
console.log('no data in event');
return;
}
if (data === 'finished') {
console.log('closing connection')
sse.close()
}
// my work here based on message
});
我的烧瓶服务器
from flask import Response
@app_api.route("/sse")
def stream():
def trackerStream():
finished = check_if_finished()
while not finished:
sleep(1) # poll timeout
if should_send_event():
yield f"data: {mydata()}\n\n"
else:
yield "data: nodata\n\n"
finished = check_if_finished()
yield "data: finished\n\n"
return Response(trackerStream(), mimetype="text/event-stream")
注意:确保您始终以一定的时间间隔从服务器向客户端发送事件。如果用户关闭浏览器,flask 在尝试写入套接字时会出错,并会为您关闭流。如果您没有在某个时间间隔写入客户端,即使您只是在写入data: nodata\n\n
,那么服务器可能会陷入循环。
好吧,这取决于您的应用程序的架构。
让我给你看一个例子(在https://github.com/jkbr/chat/blob/master/app.py看到这个代码):
def event_stream():
pubsub = red.pubsub()
pubsub.subscribe('chat')
for message in pubsub.listen():
print message
yield 'data: %s\n\n' % message['data']
@app.route('/stream')
def stream():
return flask.Response(event_stream(),
mimetype="text/event-stream")
Flask 稳定地向 Redis 请求一条新消息(锁定操作),但是当 Flask 看到流式传输终止时(StopIteration
如果您不是 Python 新手),它会返回。
def event_stream():
pubsub = red.pubsub()
pubsub.subscribe('chat')
for message in pubsub.listen():
if i_should_close_the_connection:
break
yield 'data: %s\n\n' % message['data']
@app.route('/stream')
def stream():
return flask.Response(event_stream(),
mimetype="text/event-stream")
我遇到了同样的问题,最后在这里找到了以下解决方案。
import time
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/stream')
def stream():
def gen():
try:
i = 0
while True:
data = 'this is line {}'.format(i)
print(data)
yield data + '<br>'
i += 1
time.sleep(1)
except GeneratorExit:
print('closed')
return Response(stream_with_context(gen()))
您可以通过订阅 Redis 的侦听器来启动该方法。而不是time.sleep(1)
你可以等待 Redis 的listen()
方法返回一个值。print('closed')
您可以取消订阅 Redis 。剩下的唯一问题GeneratorExit
是只有在将产量值发送到客户端时才会引发异常。因此,如果 Redislisten()
永远不会结束,那么您将永远不会发现连接已断开。