0

我正在尝试使用转换库。
这个问题紧随其后,相当松散。

我想将监听on_enter事件委托给所有状态,并创建几个这样的监听器,它们可以订阅并在进入状态时收到通知。
就我而言,我想通知外部事件系统根据状态订阅不同的事件配置。


对于这个例子,我将使用一个状态机(比如固体<->流体<->气体事件[热,冷])。

这可以很容易地使用这样的库来完成

from transitions import Machine
from transitions import EventData


class Matter(object):
    def __init__(self):
        transitions = [
            {'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
            {'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
            {'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
            {'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
        ]
        self.machine = Machine(
                model=self,
                states=['solid', 'liquid', 'gas'],
                transitions=transitions,
                initial='solid',
                send_event=True
        )

    def on_enter_gas(self, event: EventData):
        print(f"entering gas from {event.transition.source}")

    def on_enter_liquid(self, event: EventData):
        print(f"entering liquid from {event.transition.source}")

    def on_enter_solid(self, event: EventData):
        print(f"entering solid from {event.transition.source}")


matter = Matter()
matter.heat()  # entering liquid from solid
matter.heat()  # entering gas from liquid
matter.cool()  # entering liquid from gas
matter.cool()  # entering solid from liquid

伟大的!现在,我想通过订阅事件向外部通知。 我想以一种至少将外部世界与机器内部耦合的方式来做到这一点,这样如果我要更改状态名称,或者添加或删除状态,我就不用担心破坏任何用户机器。on_enter

我可以做到这一点的一种方法如下,它的缺点是与机器内部耦合,并迫使我自己实现库的许多功能。

from transitions import Machine
from transitions import EventData
from typing import Callable


class Matter(object):
    states = ['solid', 'liquid', 'gas']
    
    def __init__(self):
        transitions = [
            {'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
            {'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
            {'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
            {'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
        ]
        self.machine = Machine(
                model=self,
                states=self.states,
                transitions=transitions,
                initial='solid',
                send_event=True
        )

        self._subscriptions = {}

    def on_enter_gas(self, event: EventData):
        print(f"entering gas from {event.transition.source}")
        if "on_enter_gas" in self._subscriptions:
            self._subscriptions["on_enter_solid"]()

    def on_enter_liquid(self, event: EventData):
        print(f"entering liquid from {event.transition.source}")
        if "on_enter_liquid" in self._subscriptions:
            self._subscriptions["on_enter_solid"]()

    def on_enter_solid(self, event: EventData):
        print(f"entering solid from {event.transition.source}")
        if "on_enter_solid" in self._subscriptions:
            self._subscriptions["on_enter_solid"]()
        
    def subscribe(self, state: str, trigger: str, callback: Callable):
        assert state in self.states
        machine_event = trigger + "_" + state
        if machine_event not in self._subscriptions:
            self._subscriptions[machine_event] = callback            

这允许为任何状态添加外部回调。

根据这里的评论,上面应该有一些更好的 API 来动态添加每个状态的订阅,但我无法在文档中找到它。


即使图书馆确实可以做到这一点,我认为这还不够。
任何订阅者都必须知道机器的状态才能订阅 <on_enter> 它们,而不是简单地成为机器上的侦听器,并实现任何要通知它发生的事件,就像一个人可以轻松地添加一个on_enter_solidjust通过“固体”状态的存在。

我最理想的做法是拥有一些我可以继承(或以其他方式)的监听器类,并且只在外部实现我需要监听的方法。

使用该库完成此操作或类似操作的最佳方法是什么?

4

1 回答 1

1

我想通过订阅从外部通知 on_enter 事件。我想以一种至少将外部世界与机器内部耦合的方式来做到这一点,这样如果我要更改状态名称,或者添加或删除状态,我就不用担心破坏任何用户机器。

最少的耦合是转发事件并让订阅者决定如何处理它:

from transitions import Machine
from transitions import EventData
from typing import Callable


class Observer:

    def state_changed(self, event_data: EventData):
        print(f"state is now '{event_data.state.name}'")


class SubscribableMachine(Machine):
    states = ['solid', 'liquid', 'gas']
    transitions = [
        {'trigger': 'heat', 'source': 'solid', 'dest': 'liquid'},
        {'trigger': 'heat', 'source': 'liquid', 'dest': 'gas'},
        {'trigger': 'cool', 'source': 'gas', 'dest': 'liquid'},
        {'trigger': 'cool', 'source': 'liquid', 'dest': 'solid'}
    ]

    def __init__(self):
        super().__init__(states=self.states, transitions=self.transitions,
                         initial='solid', after_state_change="notify",
                         send_event=True)
        self._subscriptions = []

    def notify(self, event_data: EventData):
        for func in self._subscriptions:
            func(event_data)

    def subscribe(self, func: Callable):
        self._subscriptions.append(func)


machine = SubscribableMachine()
observer = Observer()
machine.subscribe(observer.state_changed)
machine.heat()  # >>> state is now 'LIQUID'

如果您让观察者订阅特定的转换和/或状态事件,那么当您稍后重命名这些事件时,这显然会破坏他们的代码。然而,在我看来,仅仅传递事件会大大降低状态机和一般状态模式的有用性,因为它是状态模式中最好的部分之一,它摆脱了 if-elif-else-cascades。

理想情况下,我想做的是有一些我可以继承(或以其他方式)的监听器类,并且只在外部实现我需要监听的方法。

我会说您不需要特定的侦听器类。您可以直接将可调用对象添加到状态进入/退出回调。此外,您可以将字符串替换为 (string) Enums 作为状态标识符。这样,您可以更改 Enum 的值,而不会对观察者产生任何影响。这可以防止在订阅特定状态时出现拼写错误:

from transitions import Machine
from transitions import EventData
from typing import Callable
from enum import Enum, auto


class Observer:

    def state_changed(self, event_data: EventData):
        print(f"state is now '{event_data.state.name}'")


class State(Enum):
    SOLID = auto()
    LIQUID = auto()
    GAS = auto()


class SubscribableMachine(Machine):

    transitions = [
        {'trigger': 'heat', 'source': State.SOLID, 'dest': State.LIQUID},
        {'trigger': 'heat', 'source': State.LIQUID, 'dest': State.GAS},
        {'trigger': 'cool', 'source': State.GAS, 'dest': State.LIQUID},
        {'trigger': 'cool', 'source': State.LIQUID, 'dest': State.SOLID}
    ]

    def __init__(self):
        super().__init__(states=State, transitions=self.transitions,
                         initial=State.SOLID, send_event=True)

    def subscribe(self, func: Callable, state: State):
        self.get_state(state).on_enter.append(func)

    def unsubscribe(self, func: Callable, state: State):
        self.get_state(state).on_enter.remove(func)


machine = SubscribableMachine()
observer = Observer()
machine.subscribe(observer.state_changed, State.LIQUID)
machine.heat()  # >>> state is now 'LIQUID'
machine.heat()
assert machine.state == State.GAS
machine.unsubscribe(observer.state_changed, State.LIQUID)
machine.cool()  # no output
assert machine.state == State.LIQUID

以相同方式订阅特定转换的语法是什么?

对于转场,您可以使用machine.get_transitions(trigger, source, dest)来获取一组转场。正如文档中提到的(例如Callback execution order),转换具有两个回调事件:beforeafter. 如果您想在转换发生后(也State.enter被调用后)收到通知,您的订阅/取消订阅方法可能如下所示:

    def subscribe(self, func, trigger="", source="*", dest="*"):
        for transition in self.get_transitions(trigger, source, dest):
            transition.after.append(func)
            
    def unsubscribe(self, func, trigger="", source="*", dest="*"):
        for transition in self.get_transitions(trigger, source, dest):
            transition.after.remove(func)
# ...
machine.subscribe(observer.state_changed, "heat")
machine.heat()  >>> state is now 'LIQUID'
machine.heat()  >>> state is now 'GAS'

你可以before改为看看输出是如何state_changed变化的。此外,您可以通过sourcedestination进一步缩小范围:

machine.subscribe(observer.state_changed, "heat", source=State.LIQUID)
# ...
machine.heat()  >>> <nothing>
machine.heat()  >>> state is now 'GAS'

对于取消订阅,您需要记住过滤器设置或在list.remove尝试删除不在回调数组中的元素时捕获错误。

于 2021-07-06T10:56:54.007 回答