8

我已经编写了一个事件源聚合,现在实现了一个事件源传奇......我注意到两者很相似,并创建了一个事件源对象作为两者派生的基类。

我在这里看过一个演示http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/但感觉可能存在问题,因为命令可能会丢失由于发送命令在写事务之外而导致进程崩溃?

public void Save(ISaga saga)
{
    var events = saga.GetUncommittedEvents();
    eventStore.Write(new UncommittedEventStream
    {
        Id = saga.Id,
        Type = saga.GetType(),
        Events = events,
        ExpectedVersion = saga.Version - events.Count
    });

    foreach (var message in saga.GetUndispatchedMessages())
        bus.Send(message); // can be done in different ways

    saga.ClearUncommittedEvents();
    saga.ClearUndispatchedMessages();
}

相反,我使用的是 Greg Young 的 EventStore,当我保存 EventSourcedObject(聚合或 saga)时,顺序如下:

  1. Repository 获取新的 MutatingEvents 列表。
  2. 将它们写入流。
  3. 当流被写入并提交到流时,EventStore 会触发新事件。
  4. 我们监听来自 EventStore 的事件并在 EventHandlers 中处理它们。

我正在实现一个传奇的两个方面:

  1. 接收事件,这些事件可能会转换状态,而状态又可能会发出命令
  2. 发出警报,在将来的某个时间点(通过外部计时器服务)我们可以被回调)。

问题

  1. 据我了解,事件处理程序不应发出命令(如果命令失败会发生什么?) - 但我可以接受上述情况,因为 Saga 是通过此事件代理控制命令创建(对事件的反应)的实际事物,并且任何命令发送失败都可以在外部处理(在外部事件处理程序中CommandEmittedFromSaga,如果命令失败则处理并重新发送)?

  2. 还是我忘记了包装事件并将本机存储CommandsEvents同一个流中(与基类 Message 混合 - Saga 会同时使用命令和事件,聚合只会使用事件)?

  3. 网络上还有其他用于实现事件源 Sagas 的参考资料吗?有什么我可以理智地检查我的想法的吗?

一些背景代码如下。

Saga 发出运行命令(包装在 CommandEmittedFromSaga 事件中)

下面的命令包含在事件中:

public class CommandEmittedFromSaga : Event
{
    public readonly Command Command;
    public readonly Identity SagaIdentity;
    public readonly Type SagaType;

    public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
    {
        Command = command;
        SagaType = sagaType;
        SagaIdentity = sagaIdentity;
    }
}

Saga 在未来某个时间点请求回调(AlarmRequestedBySaga 事件)

警报回调请求被包装在一个事件中,并将在请求的时间或之后向 Saga 回火和事件:

public class AlarmRequestedBySaga : Event
{
    public readonly Event Event;
    public readonly DateTime FireOn;
    public readonly Identity Identity;
    public readonly Type SagaType;

    public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
    {
        Identity = identity;
        SagaType = sagaType;
        Event = @event;
        FireOn = fireOn;
    }
}

或者,我可以将命令和事件存储在同一基本类型消息流中

public abstract class EventSourcedSaga
{
    protected EventSourcedSaga() { }

    protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
    {
        Identity = id;

        if (messages == null) throw new ArgumentNullException(nameof(messages));

        var count = 0;

        foreach (var message in messages)
        {
            var ev = message as Event;
            var command = message as Command;

            if(ev != null) Transition(ev);
            else if(command != null) _messages.Add(command);
            else throw new Exception($"Unsupported message type {message.GetType()}");

            count++;
        }

        if (count == 0)
            throw new ArgumentException("No messages provided");

        // All we need to know is the original number of events this
        // entity has had applied at time of construction.
        _unmutatedVersion = count;
        _constructing = false;
    }

    readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
    readonly List<Message> _messages = new List<Message>();
    readonly int _unmutatedVersion;
    private readonly bool _constructing = true;
    public readonly Identity Identity;

    public IList<Message> GetMessages()
    {
        return _messages.ToArray();
    }

    public void Transition(Event e)
    {
        _messages.Add(e);
        _dispatcher.Dispatch(this, e);
    }

    protected void SendCommand(Command c)
    {
        // Don't add a command whilst we are in the constructor. Message
        // state transition during construction must not generate new
        // commands, as those command will already be in the message list.
        if (_constructing) return;

        _messages.Add(c);
    }

    public int UnmutatedVersion() => _unmutatedVersion;
}
4

1 回答 1

8

我相信前两个问题是对流程管理器(又名 Sagas,见底部术语注释)的错误理解的结果。

转变你的想法

似乎您正在尝试将其建模(就像我曾经做过的那样)作为逆聚合。问题在于:聚合的“社会契约”是它的输入(命令)可以随时间变化(因为系统必须能够随时间变化),但它的输出(事件)不能。一旦写入,事件就是历史问题,系统必须始终能够处理它们。有了这个条件,就可以从不可变的事件流中可靠地加载聚合。

如果您尝试仅将输入和输出反转为流程管理器实现,那么它的输出就不能成为记录问题,因为随着时间的推移,命令可能会被弃用并从系统中删除。当您尝试使用已删除的命令加载流时,它将崩溃。因此,建模为反向聚合的流程管理器无法可靠地从不可变消息流中重新加载。(嗯,我敢肯定你可以设计一种方法......但它是明智的吗?)

因此,让我们通过查看它所取代的内容来考虑实现流程管理器。以管理订单履行等流程的员工为例。您为该用户做的第一件事是在 UI 中设置一个视图供他们查看。您要做的第二件事是在 UI 中制作按钮,以便用户执行操作以响应他们在视图上看到的内容。前任。“这一行有PaymentFailed,所以我点击CancelOrder。这一行有PaymentSucceededOrderItemOutOfStock,所以我点击ChangeToBackOrder。这个订单是Pending1 天前的,所以我点击FlagOrderForReview“......等等。一旦决策过程定义明确并且开始需要用户太多时间,您的任务就是自动化这个过程。为了自动化它,其他一切都可以保持不变(视图,甚至一些用户界面,以便您可以检查它),但用户已更改为一段代码。

“走开,不然我用一个很小的shell脚本代替你。”

进程管理器代码现在定期读取视图,如果存在某些数据条件,则可以发出命令。本质上,流程管理器的最简单版本是一些在计时器上运行的代码(例如每小时)并依赖于特定的视图。这就是我要开始的地方......使用您已经拥有的东西(视图/视图更新程序)和最少的添加(定期运行的代码)。即使您稍后决定对某些用例需要不同的功能,“未来的您”也会更好地了解需要解决的具体缺点。

这是一个提醒你Gall 定律和 YAGNI 的好地方。

  1. 网络上还有其他用于实现事件源 Sagas 的参考资料吗?有什么我可以理智地检查我的想法的吗?

很难找到好的材料,因为这些概念具有非常可塑性的实现,并且有各种各样的示例,其中许多为通用目的而过度设计。但是,这是我在答案中使用的一些参考资料。

DDD - 不断发展的业务流程
DDD/CQRS Google Group(大量阅读材料)


请注意,术语 Saga 与 Process Manager 具有不同的含义。一个常见的 saga 实现基本上是一个路由单,每个步骤及其相应的故障补偿都包含在单上。这取决于路由单的每个接收器执行路由单上指定的操作并成功地将其传递到下一跳或执行故障补偿和向后路由。在处理由不同组管理的多个系统时,这可能有点过于乐观,因此通常使用流程管理器来代替。有关更多信息,请参阅此 SO 问题

于 2015-11-06T20:07:46.770 回答