0

我正在尝试通过 websockets 实现客户端-服务器应用程序,并且我有几个疑问如何正确地做到这一点以维护每个连接的客户端的状态。

全局机器+每个连接的许多对象?
机器 + 对象 - 每个连接?

所以,我从几个测试开始,检查它是如何同时工作的

基础机

class AsyncModel:
    def __init__(self, id_):
        self.req_id = id_

    async def prepare_model(self, _):
        print("prepare_model", self.req_id)


    async def before_change(self, _):
        print("before_change", self.req_id)


    async def after_change(self, _):
        print("After change", self.req_id)


transition = dict(trigger="start", source="Start", dest="Done",
                  prepare="prepare_model",
                  before=["before_change"],
                  after="after_change")

和几种跑步类型

我想让所有模型同时改变它们的状态


async def main():
    tasks = []
    machine = AsyncMachine(model=None,
                           states=["Start", "Done"],
                           transitions=[transition],
                           initial='Start',
                           send_event=True,
                           queued=True)
    for i in range(3):
        model = AsyncModel(id_=i)
        machine.add_model(model)

        tasks.append(model.start())

    await asyncio.gather(*tasks)

    for m in machine.models:
        machine.remove_model(m)

asyncio.run(main())

但输出是:


prepare_model 0
before_change 0
After change 0
prepare_model 1
before_change 1
After change 1
prepare_model 2
before_change 2
After change 2

如果我创建机器+模型:


async def main():
    tasks = []

    for i in range(3):
        model = AsyncModel(id_=i)
        machine = AsyncMachine(model=model,
                               states=["Start", "Done"],
                               transitions=[transition],
                               initial='Start',
                               send_event=True,
                               queued=True)


        tasks.append(model.start())

    await asyncio.gather(*tasks)

输出是:

prepare_model 0
prepare_model 1
prepare_model 2
before_change 0
before_change 1
before_change 2
After change 0
After change 1
After change 2

什么是正确的方法?

更新

我希望每个正在运行的模型都有可用的上下文变量,以便能够正确记录模型调用的其他模块的所有活动,而不是将某些标识符显式传递给每个外部函数调用(outisde 模型类)
参见某种示例https:// pastebin.com/qMfh0kNb,它没有按预期工作,断言触发

4

1 回答 1

2

“什么是正确的方法?”这个问题的常见答案。是“嗯,这取决于......”。如果不清楚您想要实现什么,我只能回答我可以在您的帖子中确定的一般性问题。

有了transitions,我应该为每个型号使用一台机器还是为所有型号使用一台机器?

使用时transitions,它是有状态的模型并且包含转换回调。机器在那里充当一种“规则手册”。因此,当机器具有相同的配置时,对于大多数用例,我建议对所有型号使用一台机器。在大多数情况下,使用具有相同配置的多台机器只会增加内存占用和代码复杂性。在我的脑海中,我可以想到一个用例,其中拥有多台具有相同配置的机器可能会很有用。但首先你可能想知道为什么两个版本的行为不同,即使我刚刚说它应该没有区别。

AsyncMachine为什么在使用 one vs many时以不同的顺序调用回调AsyncMachines

如果没有自定义参数,使用一个AsyncMachine或多个AsyncMachines没有任何区别。但是,您传入queued=True了根据文档执行此操作的构造函数:

如果启用排队处理,则在触发下一个转换之前完成转换

这就是为什么你的单台机器一次会考虑一个转换,在转移到下一个事件/转换之前处理一个模型的所有回调。由于每台机器都有自己的事件/转换队列,因此在使用多台机器时会立即处理事件。queued=True在您使用多台机器的示例中,通过没有任何影响。queued通过不传递参数或传递queued=False(默认值),您可以在一台机器上获得相同的行为。我对您的示例进行了一些修改以进行说明:

from transitions.extensions import AsyncMachine
import asyncio


class AsyncModel:
    def __init__(self, id_):
        self.req_id = id_

    async def prepare_model(self):
        print("prepare_model", self.req_id)

    async def before_change(self):
        print("before_change", self.req_id)

    async def after_change(self):
        print("after change", self.req_id)


transition = dict(trigger="start", source="Start", dest="Done",
                  prepare="prepare_model",
                  before="before_change",
                  after="after_change")

models = [AsyncModel(i) for i in range(3)]


async def main(queued):
    machine = AsyncMachine(model=models,
                           states=["Start", "Done"],
                           transitions=[transition],
                           initial='Start',
                           queued=queued)

    await asyncio.gather(*[model.start() for model in models])
    # alternatively you can dispatch an event to all models of a machine by name
    # await machine.dispatch("start")

print(">>> Queued=True")
asyncio.run(main(queued=False))
print(">>> Queued=False")
asyncio.run(main(queued=False))

所以这取决于你需要什么。使用 ONE 机器,您可以同时使用 - 顺序处理事件queued=True或使用queued=False.

你提到有一个用例可能需要多台机器......

在文档中有这样一段话:

您应该考虑传递queued=True给 TimeoutMachine 构造函数。这将确保事件是按顺序处理的,并避免在超时和事件非常接近发生时可能出现的异步竞速条件。

当使用超时事件或其他紧密连续发生的事件时,当同时处理同一模型上的多个转换时,可能会出现竞争情况。因此,当此问题影响您的用例并且您需要在单独的模型上并行处理转换时,拥有多台具有相同配置的机器可能是一种解决方案。

如何使用中的上下文AsyncMachine

这对我来说是薄冰,我可能是不正确的。我可以尝试简要总结一下我目前对事物为何以某种方式表现的理解。考虑这个例子:

from transitions.extensions import AsyncMachine
import asyncio
import contextvars

context_model = contextvars.ContextVar('model', default=None)
context_message = contextvars.ContextVar('message', default="unset")

def process():
    model = context_model.get()
    print(f"Processing {model.id} Request {model.count} => '{context_message.get()}'")


class Model:

    def __init__(self, id):
        self.id = id
        self.count = 0

    def request(self):
        self.count += 1
        context_message.set(f"Super secret request from {self.id}")

    def nested(self):
        context_message.set(f"Not so secret message from {self.id}")
        process()


models = [Model(i) for i in range(3)]


async def model_loop(model):
    context_model.set(model)
    context_message.set(f"Hello from the model loop of {model.id}")
    while model.count < 3:
        await model.loop()


async def main():
    machine = AsyncMachine(model=models, initial='Start', transitions=[['loop', 'Start', '=']],
                           before_state_change='request',
                           after_state_change=[process, 'nested'])
    await asyncio.gather(*[model_loop(model) for model in models])

asyncio.run(main())

输出:

# Processing 0 Request 1 => 'Hello from the model loop of 0'
# Processing 0 Request 1 => 'Not so secret message from 0'
# Processing 1 Request 1 => 'Hello from the model loop of 1'
# Processing 1 Request 1 => 'Not so secret message from 1'
# Processing 2 Request 1 => 'Hello from the model loop of 2'
# Processing 2 Request 1 => 'Not so secret message from 2'
# Processing 0 Request 2 => 'Hello from the model loop of 0'
# Processing 0 Request 2 => 'Not so secret message from 0'
# Processing 1 Request 2 => 'Hello from the model loop of 1'
# Processing 1 Request 2 => 'Not so secret message from 1'
# Processing 2 Request 2 => 'Hello from the model loop of 2'
# Processing 2 Request 2 => 'Not so secret message from 2'
# Processing 0 Request 3 => 'Hello from the model loop of 0'
# Processing 0 Request 3 => 'Not so secret message from 0'
# Processing 1 Request 3 => 'Hello from the model loop of 1'
# Processing 1 Request 3 => 'Not so secret message from 1'
# Processing 2 Request 3 => 'Hello from the model loop of 2'
# Processing 2 Request 3 => 'Not so secret message from 2'

触发事件已转发到设置两个上下文变量的模型循环。两者都由 process一个使用上下文变量进行处理的全局函数使用。当触发转换时,Model.request将在转换之前调用并增加Model.count. 更改后,将调用Model.state全局函数。processModel.nested

process被调用两次:一次在模型循环中,一次在Model.nested回调中。更改context_message的 fromModel.request不可访问,但更改Model.nested可用于process. 怎么样?因为processModel.request共享相同的父上下文(Model可以检索 的当前值context_message),但是当Model设置变量时,它仅在其当前本地上下文中可用,后面的调用(在另一个回调中)无法访问到process。如果您希望可以访问本地更改,则process需要从回调中触发它,如Model.nested.

长话短说:回调AsyncMachine确实共享相同的父上下文,但不能访问彼此的本地上下文,因此更改无效。但是,当上下文变量是引用(如context_model)时,可以在其他回调中访问对模型的更改。

使用transitions事件queued=True队列contextvars _涉及排队呼叫的转换最终将成功完成。即使只处理一个事件也是如此。 “。触发的事件可能只添加到队列中。紧接着,它的上下文在事件被处理之前被留下。如果您需要排队处理和 contextvars 并且也不能从 INSIDE 模型回调中调用函数,您应该检查asyncio.Lock并将您的调用包装到loop但离开queued=False防止函数调用在完成之前返回。

于 2020-11-07T17:02:50.637 回答