我想将监听on_enter事件委托给所有状态,并创建几个这样的监听器,它们可以订阅并在进入状态时收到通知。
就我而言,我想通知外部事件系统根据状态订阅不同的事件配置。
对于这个例子,我将使用一个状态机(比如固体<->流体<->气体事件[热,冷])。
这可以很容易地使用这样的库来完成
from transitions import Machine
from transitions import EventData
class Matter(object):
def __init__(self):
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
self.machine = Machine(
model=self,
states=['solid', 'liquid', 'gas'],
transitions=transitions,
initial='solid',
send_event=True
)
def on_enter_gas(self, event: EventData):
print(f"entering gas from {event.transition.source}")
def on_enter_liquid(self, event: EventData):
print(f"entering liquid from {event.transition.source}")
def on_enter_solid(self, event: EventData):
print(f"entering solid from {event.transition.source}")
matter = Matter()
matter.heat() # entering liquid from solid
matter.heat() # entering gas from liquid
matter.cool() # entering liquid from gas
matter.cool() # entering solid from liquid
伟大的!现在,我想通过订阅事件向外部通知。
我想以一种至少将外部世界与机器内部耦合的方式来做到这一点,这样如果我要更改状态名称,或者添加或删除状态,我就不用担心破坏任何用户机器。on_enter
我可以做到这一点的一种方法如下,它的缺点是与机器内部耦合,并迫使我自己实现库的许多功能。
from transitions import Machine
from transitions import EventData
from typing import Callable
class Matter(object):
states = ['solid', 'liquid', 'gas']
def __init__(self):
transitions = [
{'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
{'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
{'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
{'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
]
self.machine = Machine(
model=self,
states=self.states,
transitions=transitions,
initial='solid',
send_event=True
)
self._subscriptions = {}
def on_enter_gas(self, event: EventData):
print(f"entering gas from {event.transition.source}")
if "on_enter_gas" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def on_enter_liquid(self, event: EventData):
print(f"entering liquid from {event.transition.source}")
if "on_enter_liquid" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def on_enter_solid(self, event: EventData):
print(f"entering solid from {event.transition.source}")
if "on_enter_solid" in self._subscriptions:
self._subscriptions["on_enter_solid"]()
def subscribe(self, state: str, trigger: str, callback: Callable):
assert state in self.states
machine_event = trigger + "_" + state
if machine_event not in self._subscriptions:
self._subscriptions[machine_event] = callback
这允许为任何状态添加外部回调。
根据这里的评论,上面应该有一些更好的 API 来动态添加每个状态的订阅,但我无法在文档中找到它。
即使图书馆确实可以做到这一点,我认为这还不够。
任何订阅者都必须知道机器的状态才能订阅 <on_enter> 它们,而不是简单地成为机器上的侦听器,并实现任何要通知它发生的事件,就像一个人可以轻松地添加一个on_enter_solidjust通过“固体”状态的存在。
我最理想的做法是拥有一些我可以继承(或以其他方式)的监听器类,并且只在外部实现我需要监听的方法。
使用该库完成此操作或类似操作的最佳方法是什么?