0

在我设计用于远程管理 RPI 上的某些服务的基于烧瓶的 http 服务器中,我遇到了一个我无法单独解决的问题,因此请您给我一个提示。

概念:通过 flask 和 gevent,我可以停止并运行一些(两个)在 RPI 上运行的服务。我使用 gevent 和关于 javascript 的服务器端事件来收听 html 更新。

html 页面显示服务的状态(开/关/处理),并提供打开/关闭它们的按钮。另外显示一些系统参数(CPU、RAM、HDD、NET)。

只要只有一个用户/页面打开,一切都按预期工作。一旦有更多用户访问烧瓶服务器,就会在为每个用户/页面提供服务的 greenlet 之间进行竞争,并且并非所有页面都会重新加载。

问题:如何向所有正在运行的 greenlets sse_worker() 发送消息并在他们的常规工作之上处理它?

在高级代码下方。完整的源代码可以在这里找到:https ://github.com/petervflocke/flasksse_rpi检查 sse.py 文件

def sse_worker(): #neverending task
   while True:
       if there_is_a_change_in_process_status:
          reload_page=True
       else: 
          reload_page=False
       Do some other tasks:
          update some_single_parameters_to_be_passed_to_html_page
       yield 'data: ' + json.dumps(all_parameters)
       gevent.sleep(1)

@app.route('/stream/', methods=['GET', 'POST'])
def stream():
    return Response(sse_worker(), mimetype="text/event-stream")

if __name__ == "__main__":
    gevent.signal(signal.SIGTERM, stop)
    http_server = WSGIServer(('', 5000), app)
    http_server.serve_forever()

...在 html 页面上,相应地处理流式 json 数据。如果服务的状态已根据 reload_page 变量 javascript 重新加载完整页面 - 代码摘录如下:

<script>
    function listen() {
        var source = new EventSource("/stream/");
        var target1 = document.getElementById("time");
        ....  
        source.onmessage = function(msg) {
          obj = JSON.parse(msg.data);
          target1.innerHTML = obj.time;
        ....
          if (obj.reload == "1") {
            location.reload();
          }
        }
    }
    listen();
</script>

我想要的解决方案是像这样扩展 sse_worker() :

def sse_worker():
   while True:
       if there_is_a_change_in_process_status:
          reload_page=True
          # NEW: set up a semaphore/flag that there is a change on the page
          message_set(reload)
       elif message_get(block=false)==reload: # NEW: check the semaphore
          # issue: the message_get must retun "reload" for _all_ active sse_workers, that all of them can push the reload to "their" pages 
          reload_page=True
       else: 
          reload_page=False
       Do some other tasks:
          update some_single_parameters_to_be_passed_to_html_page
       yield 'data: ' + json.dumps(all_parameters)
       gevent.sleep(1)

我希望我能传递我的信息。您有什么想法可以解决同步问题吗?请注意,我们在同一个 sse_worker 函数中有生产者和消费者。

任何想法都非常受欢迎!最好的问候彼得

4

0 回答 0