9

我想为与多个工作人员(即多个进程)一起运行的 Flask 应用程序提供共享状态。

引用关于此主题的类似问题的答案:

您不能使用全局变量来保存此类数据。[...] 使用 Flask 之外的数据源来保存全局数据。数据库、memcached 或 redis 都是合适的单独存储区域,具体取决于您的需要。

(来源:Flask 中的全局变量线程安全吗?如何在请求之间共享数据?

我的问题是关于如何在 Flask“外部”提供数据的建议的最后一部分。目前,我的网络应用程序非常小,我想避免对其他程序的要求或依赖。如果我不想在后台运行 Redis 或其他任何东西,而是为所有内容提供 Web 应用程序的 Python 代码,我有什么选择?

4

2 回答 2

19

如果您的网络服务器的工作器类型与multiprocessing模块兼容,您可以使用multiprocessing.managers.BaseManager它为 Python 对象提供共享状态。一个简单的包装器可能如下所示:

from multiprocessing import Lock
from multiprocessing.managers import AcquirerProxy, BaseManager, DictProxy

def get_shared_state(host, port, key):
    shared_dict = {}
    shared_lock = Lock()
    manager = BaseManager((host, port), key)
    manager.register("get_dict", lambda: shared_dict, DictProxy)
    manager.register("get_lock", lambda: shared_lock, AcquirerProxy)
    try:
        manager.get_server()
        manager.start()
    except OSError:  # Address already in use
        manager.connect()
    return manager.get_dict(), manager.get_lock()

您可以将数据分配给shared_dict以使其跨进程访问:

HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"
shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)

shared_dict["number"] = 0
shared_dict["text"] = "Hello World"
shared_dict["array"] = numpy.array([1, 2, 3])

但是,您应该注意以下情况:

  • 用于shared_lock在覆盖shared_dict. (请参阅下面的 Flask 示例。)
  • 没有数据持久性。如果您重新启动应用程序,或者如果主(第一个)BaseManager进程终止,则共享状态将消失。
  • 通过这个简单的实现BaseManager,您不能直接在shared_dict. 例如,shared_dict["array"][1] = 0没有效果。您必须编辑副本,然后将其重新分配给字典键。

烧瓶示例:

以下 Flask 应用程序使用全局变量来存储计数器编号:

from flask import Flask
app = Flask(__name__)

number = 0

@app.route("/")
def counter():
    global number
    number += 1
    return str(number)

这在仅使用 1 个 worker 时有效gunicorn -w 1 server:app。当使用多个工作进程gunicorn -w 4 server:app时,很明显这number不是共享状态,而是每个工作进程的独立状态。

相反,使用shared_dict,应用程序看起来像这样:

from flask import Flask
app = Flask(__name__)

HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"
shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)

shared_dict["number"] = 0

@app.route("/")
def counter():
    with shared_lock:
        shared_dict["number"] += 1
    return str(shared_dict["number"])

这适用于任意数量的工人,例如gunicorn -w 4 server:app.

于 2019-09-05T18:07:08.040 回答
4

你的例子对我来说有点神奇!我建议multiprocessingNamespace. 我试图使以下代码与spawn服务器(即 MS Windows)兼容,但我只能访问 Linux 机器,所以无法在那里测试

首先引入依赖项并定义我们的自定义Manager并注册一个方法以获取Namespace单例:

from multiprocessing.managers import BaseManager, Namespace, NamespaceProxy

class SharedState(BaseManager):
    _shared_state = Namespace(number=0)

    @classmethod
    def _get_shared_state(cls):
        return cls._shared_state

SharedState.register('state', SharedState._get_shared_state, NamespaceProxy)

如果创建初始状态的成本很高,那么这可能需要更复杂,因此只能在需要时进行。请注意,如果 gunicorn 稍后启动一个新的工作进程,例如在由于超时杀死一个工作进程之后,进程启动期间初始化状态的 OPs 版本将导致一切重置

接下来我定义一个函数来访问这个共享状态,类似于 OP 的做法:

def shared_state(address, authkey):
    manager = SharedState(address, authkey)
    try:
        manager.get_server()  # raises if another server started
        manager.start()
    except OSError:
        manager.connect()
    return manager.state()

虽然我不确定我是否建议做这样的事情。启动时gunicorn会产生许多进程,所有进程都竞相运行此代码,如果有时会出错,我也不会感到惊讶。此外,如果它碰巧杀死了服务器进程(例如由于超时),其他所有进程都将开始失败

也就是说,如果我们想使用它,我们会做类似的事情:

ss = shared_state('server.sock', b'noauth')

ss.number += 1

这使用 Unix 域套接字(传递一个字符串而不是一个元组作为地址)来锁定它一点。

另请注意,这与 OP 的代码具有相同的竞争条件:增加一个数字将导致该值被传输到工作进程,然后增加,并发送回服务器。我不确定_lock应该保护什么,但我认为它不会起到太大作用

于 2019-09-06T14:37:56.183 回答