在我设计用于远程管理 RPI 上的某些服务的基于烧瓶的 http 服务器中,我遇到了一个我无法单独解决的问题,因此请您给我一个提示。
概念:通过 flask 和 gevent,我可以停止并运行一些(两个)在 RPI 上运行的服务。我使用 gevent 和关于 javascript 的服务器端事件来收听 html 更新。
html 页面显示服务的状态(开/关/处理),并提供打开/关闭它们的按钮。另外显示一些系统参数(CPU、RAM、HDD、NET)。
只要只有一个用户/页面打开,一切都按预期工作。一旦有更多用户访问烧瓶服务器,就会在为每个用户/页面提供服务的 greenlet 之间进行竞争,并且并非所有页面都会重新加载。
问题:如何向所有正在运行的 greenlets sse_worker() 发送消息并在他们的常规工作之上处理它?
在高级代码下方。完整的源代码可以在这里找到:https ://github.com/petervflocke/flasksse_rpi检查 sse.py 文件
def sse_worker(): #neverending task
while True:
if there_is_a_change_in_process_status:
reload_page=True
else:
reload_page=False
Do some other tasks:
update some_single_parameters_to_be_passed_to_html_page
yield 'data: ' + json.dumps(all_parameters)
gevent.sleep(1)
@app.route('/stream/', methods=['GET', 'POST'])
def stream():
return Response(sse_worker(), mimetype="text/event-stream")
if __name__ == "__main__":
gevent.signal(signal.SIGTERM, stop)
http_server = WSGIServer(('', 5000), app)
http_server.serve_forever()
...在 html 页面上,相应地处理流式 json 数据。如果服务的状态已根据 reload_page 变量 javascript 重新加载完整页面 - 代码摘录如下:
<script>
function listen() {
var source = new EventSource("/stream/");
var target1 = document.getElementById("time");
....
source.onmessage = function(msg) {
obj = JSON.parse(msg.data);
target1.innerHTML = obj.time;
....
if (obj.reload == "1") {
location.reload();
}
}
}
listen();
</script>
我想要的解决方案是像这样扩展 sse_worker() :
def sse_worker():
while True:
if there_is_a_change_in_process_status:
reload_page=True
# NEW: set up a semaphore/flag that there is a change on the page
message_set(reload)
elif message_get(block=false)==reload: # NEW: check the semaphore
# issue: the message_get must retun "reload" for _all_ active sse_workers, that all of them can push the reload to "their" pages
reload_page=True
else:
reload_page=False
Do some other tasks:
update some_single_parameters_to_be_passed_to_html_page
yield 'data: ' + json.dumps(all_parameters)
gevent.sleep(1)
我希望我能传递我的信息。您有什么想法可以解决同步问题吗?请注意,我们在同一个 sse_worker 函数中有生产者和消费者。
任何想法都非常受欢迎!最好的问候彼得