10

我已经使用Falcon实现了一个 Web 服务。此服务存储一个状态机(pytransitions),该状态机在构造函数中传递给服务的资源。该服务使用gunicorn运行。

Web 服务在启动时使用RxPy启动一个进程。中返回的事件on_next(event)用于触发状态机中的转换。

错误

我希望状态机在服务和资源中具有一致的状态,但似乎在资源中状态永远不会改变。

我们有一个尝试重现此行为的测试,但令人惊讶的是,该测试有效

class TochoLevel(object):

    def __init__(self, tochine):
        self.tochine = tochine

    def on_get(self, req, res):
        res.status = falcon.HTTP_200
        res.body = self.tochine.state


def get_machine():
    states = ["low", "medium", "high"]

    transitions = [
        {'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
        {'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
        {'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
    ]

    locked_factory = MachineFactory.get_predefined(locked=True)

    return locked_factory(
        states=states,
        transitions=transitions,
        initial='low',
        auto_transitions=False,
        queued=False
    )

def _level_observable(observer):
    for i in range(1, 21):
        sleep(0.1)
        next_val = 'to_low'

        if 8 <= i <= 15:
            next_val = 'to_medium'
        elif i > 15:
            next_val = 'to_high'
        observer.on_next(next_val)

    observer.on_completed()

def get_level_observable():
    return Observable.create(_level_observable)

class NotBlockingService(falcon.API):
    def __init__(self):
        super(NotBlockingService, self).__init__()

        self.tochine = get_machine()
        self.add_route('/tochez', TochoLevel(self.tochine))

    def _run_machine(self, val):
        self.tochine.trigger(val)
        print('machine exec: {}, state: {}'.format(val, self.tochine.state))
        return self.tochine.state

    def start(self):
        source = get_level_observable()
        (source.subscribe_on(ThreadPoolScheduler(2))
            .subscribe(self._run_machine))


def test_can_query_falcon_service_while_being_susbcribed_as_observer():

    svc = NotBlockingService()
    client = testing.TestClient(svc)

    assert client.simulate_get('/tochez').text == 'low'

    start = time()
    svc.start()
    sleep(1.2)

    assert client.simulate_get('/tochez').text == 'medium'
    end = time()

    sleep(1.2)

    assert client.simulate_get('/tochez').text == 'high'
    assert (end - start) < 2

问题

TochoLevel为什么当我使用 gunicorn 启动服务并在rxpyon_next方法中传播状态时,状态机不会更改资源中 的状态

4

1 回答 1

6

当然,当您在开发模式下执行服务时,您只使用了一个 fork(一个执行过程)。当您使用像 Gunicorn 这样的软件时,您正在使用预分叉策略在生产环境中提供可靠的服务。

Preforking 策略生成许多子流程来解决请求,并且逻辑是独立的,在不同请求之间以独立模式工作每个 fork。

Gunicorn,感谢 Python 中 WSGI 的标准化 App 方案(Python2_PEP-333Python3_PEP-3333),接收一个 APP 对象。Gunicorn 启动其配置中指示的尽可能多的实例(预分叉)。Gunicorn 调用这样的分叉工人默认情况下它使用 1 个工人。每个工作人员都将使用其状态,也许 Gunicorn 还会为每个请求创建新的 App 对象实例......

这就是没有持久性的状态机背后的原因。

提示:首先尝试使用 1 个 worker 启动 Gunicorn 并检查状态机的状态持久性。如果你实现了状态机的持久化,第二个要解决的问题就是状态机同步所有的worker。

于 2018-05-14T08:11:02.747 回答