“什么是正确的方法?”这个问题的常见答案。是“嗯,这取决于......”。如果不清楚您想要实现什么,我只能回答我可以在您的帖子中确定的一般性问题。
有了transitions,我应该为每个型号使用一台机器还是为所有型号使用一台机器?
使用时transitions,它是有状态的模型并且包含转换回调。机器在那里充当一种“规则手册”。因此,当机器具有相同的配置时,对于大多数用例,我建议对所有型号使用一台机器。在大多数情况下,使用具有相同配置的多台机器只会增加内存占用和代码复杂性。在我的脑海中,我可以想到一个用例,其中拥有多台具有相同配置的机器可能会很有用。但首先你可能想知道为什么两个版本的行为不同,即使我刚刚说它应该没有区别。
AsyncMachine为什么在使用 one vs many时以不同的顺序调用回调AsyncMachines?
如果没有自定义参数,使用一个AsyncMachine或多个AsyncMachines没有任何区别。但是,您传入queued=True了根据文档执行此操作的构造函数:
如果启用排队处理,则在触发下一个转换之前完成转换
这就是为什么你的单台机器一次会考虑一个转换,在转移到下一个事件/转换之前处理一个模型的所有回调。由于每台机器都有自己的事件/转换队列,因此在使用多台机器时会立即处理事件。queued=True在您使用多台机器的示例中,通过没有任何影响。queued通过不传递参数或传递queued=False(默认值),您可以在一台机器上获得相同的行为。我对您的示例进行了一些修改以进行说明:
from transitions.extensions import AsyncMachine
import asyncio
class AsyncModel:
def __init__(self, id_):
self.req_id = id_
async def prepare_model(self):
print("prepare_model", self.req_id)
async def before_change(self):
print("before_change", self.req_id)
async def after_change(self):
print("after change", self.req_id)
transition = dict(trigger="start", source="Start", dest="Done",
prepare="prepare_model",
before="before_change",
after="after_change")
models = [AsyncModel(i) for i in range(3)]
async def main(queued):
machine = AsyncMachine(model=models,
states=["Start", "Done"],
transitions=[transition],
initial='Start',
queued=queued)
await asyncio.gather(*[model.start() for model in models])
# alternatively you can dispatch an event to all models of a machine by name
# await machine.dispatch("start")
print(">>> Queued=True")
asyncio.run(main(queued=False))
print(">>> Queued=False")
asyncio.run(main(queued=False))
所以这取决于你需要什么。使用 ONE 机器,您可以同时使用 - 顺序处理事件queued=True或使用queued=False.
你提到有一个用例可能需要多台机器......
在文档中有这样一段话:
您应该考虑传递queued=True给 TimeoutMachine 构造函数。这将确保事件是按顺序处理的,并避免在超时和事件非常接近发生时可能出现的异步竞速条件。
当使用超时事件或其他紧密连续发生的事件时,当同时处理同一模型上的多个转换时,可能会出现竞争情况。因此,当此问题影响您的用例并且您需要在单独的模型上并行处理转换时,拥有多台具有相同配置的机器可能是一种解决方案。
如何使用中的上下文AsyncMachine?
这对我来说是薄冰,我可能是不正确的。我可以尝试简要总结一下我目前对事物为何以某种方式表现的理解。考虑这个例子:
from transitions.extensions import AsyncMachine
import asyncio
import contextvars
context_model = contextvars.ContextVar('model', default=None)
context_message = contextvars.ContextVar('message', default="unset")
def process():
model = context_model.get()
print(f"Processing {model.id} Request {model.count} => '{context_message.get()}'")
class Model:
def __init__(self, id):
self.id = id
self.count = 0
def request(self):
self.count += 1
context_message.set(f"Super secret request from {self.id}")
def nested(self):
context_message.set(f"Not so secret message from {self.id}")
process()
models = [Model(i) for i in range(3)]
async def model_loop(model):
context_model.set(model)
context_message.set(f"Hello from the model loop of {model.id}")
while model.count < 3:
await model.loop()
async def main():
machine = AsyncMachine(model=models, initial='Start', transitions=[['loop', 'Start', '=']],
before_state_change='request',
after_state_change=[process, 'nested'])
await asyncio.gather(*[model_loop(model) for model in models])
asyncio.run(main())
输出:
# Processing 0 Request 1 => 'Hello from the model loop of 0'
# Processing 0 Request 1 => 'Not so secret message from 0'
# Processing 1 Request 1 => 'Hello from the model loop of 1'
# Processing 1 Request 1 => 'Not so secret message from 1'
# Processing 2 Request 1 => 'Hello from the model loop of 2'
# Processing 2 Request 1 => 'Not so secret message from 2'
# Processing 0 Request 2 => 'Hello from the model loop of 0'
# Processing 0 Request 2 => 'Not so secret message from 0'
# Processing 1 Request 2 => 'Hello from the model loop of 1'
# Processing 1 Request 2 => 'Not so secret message from 1'
# Processing 2 Request 2 => 'Hello from the model loop of 2'
# Processing 2 Request 2 => 'Not so secret message from 2'
# Processing 0 Request 3 => 'Hello from the model loop of 0'
# Processing 0 Request 3 => 'Not so secret message from 0'
# Processing 1 Request 3 => 'Hello from the model loop of 1'
# Processing 1 Request 3 => 'Not so secret message from 1'
# Processing 2 Request 3 => 'Hello from the model loop of 2'
# Processing 2 Request 3 => 'Not so secret message from 2'
触发事件已转发到设置两个上下文变量的模型循环。两者都由 process一个使用上下文变量进行处理的全局函数使用。当触发转换时,Model.request将在转换之前调用并增加Model.count. 更改后,将调用Model.state全局函数。processModel.nested
process被调用两次:一次在模型循环中,一次在Model.nested回调中。更改context_message的 fromModel.request不可访问,但更改Model.nested可用于process. 怎么样?因为process和Model.request共享相同的父上下文(Model可以检索 的当前值context_message),但是当Model设置变量时,它仅在其当前本地上下文中可用,后面的调用(在另一个回调中)无法访问到process。如果您希望可以访问本地更改,则process需要从回调中触发它,如Model.nested.
长话短说:回调AsyncMachine确实共享相同的父上下文,但不能访问彼此的本地上下文,因此更改值无效。但是,当上下文变量是引用(如context_model)时,可以在其他回调中访问对模型的更改。
使用transitions事件queued=True队列(contextvars _涉及排队呼叫的转换最终将成功完成。即使只处理一个事件也是如此。 “。触发的事件可能只添加到队列中。紧接着,它的上下文在事件被处理之前被留下。如果您需要排队处理和 contextvars 并且也不能从 INSIDE 模型回调中调用函数,您应该检查asyncio.Lock并将您的调用包装到loop但离开queued=False防止函数调用在完成之前返回。