1

我对如何使用 Akka.net 应用 CQRS 和事件溯源感兴趣。我已经注意到提供 ES 部分的Akka.Persistence 。

据我了解,命令处理程序和 AggergateRoot 可以由单个类中的 ReceivePersistentActor 表示。我有一个基类(这里不完整)......

public abstract class AggregateRoot<TState> : ReceivePersistentActor
{
    private readonly Func<TState, bool> shouldSafeSnapShot;

    protected AggregateRoot(TState state, Func<TState, bool> shouldSafeSnapShot)
    {
        if (state == null) throw new ArgumentNullException(nameof(state));
        if (shouldSafeSnapShot == null) throw new ArgumentNullException(nameof(shouldSafeSnapShot));

        var path = this.Self.Path;
        this.PersistenceId = $"{path.Parent.Name}/{path.Name}";
        this.State = state;
        this.shouldSafeSnapShot = shouldSafeSnapShot;
    }

    public sealed override string PersistenceId { get; }

    protected TState State { get; }

    protected void Emit<TEvent>(TEvent e, Action<TEvent> apply = null)
    {
        this.Persist(e,
            @event =>
                {
                    // update state
                    apply?.Invoke(@event);

                    // safe snapshot or so...
                    this.SaveSnapshotIfRequired();

                    // publish to event bus?
                    Context.System.EventStream.Publish(e);
                });
    }

    private void SaveSnapshotIfRequired()
    {
        var state = this.State;

        if (this.shouldSafeSnapShot(state))
        {
            this.SaveSnapshot(state);
        }
    }
}

我不想通过 EventBus 发送事件,Akka 似乎在它的包中带来了一些可能适合其他 CQRS 框架(NCqrs 中的 EventBus 抽象,Inceptum中的EventBus抽象)或 Gregory Young简单示例的方式。

// publish to event bus?
Context.System.EventStream.Publish(e);

但是,在我看来,知道 AggregateRoot 域对象参与者中的事件总线的实现很奇怪。我可以将责任交给演员孩子或注入类似IAbstractEventBus接口之类的东西,用于切换实现,但我认为 AggregateRoot 不应该负责在持久化后发布事件并通知所有订阅者,对吧!?但是 ES 部分似乎与 Actor 耦合。

使用 Akka.net 和 Akka.Persistence 的典型方法是什么?任何设计想法如何拆分?

我偶然发现了 Akka.net 的IEventAdapter接口,但我不确定这是否能让我走上正确的道路(或黑暗的一面)......

public class SomeEventAdapter : IEventAdapter
{
    public object ToJournal(object evt)
    {
        return evt; // should I add the event emitter here?
        // but is this already stored now? hmmm
    }
}
4

2 回答 2

2

在理想主义的世界中,您的命令处理程序可以负责跟踪事件以推送到事件总线并将它们持久化到某种形式的存储,一旦命令被完全处理而没有任何问题 - 这确保您不会在事件之前过早推送事件聚合方法已经完成了它应该引发的所有事件。

在 Akka.net 中,您可以在将命令传递给聚合时将 Tell 更改为 Ask(假设为了纯度起见,您将处理程序与聚合分开)并让聚合返回它想要引发并持久化的事件处理程序,因此处理程序然后可以发送和持久化它们(可能间接地本身......也许通过 UOW 上的某种形式)......但这意味着在您的聚合之前无法处理命令处理程序配置为处理的其他命令已经完成......即使其他聚合可能由一个处理程序提供服务 - 毕竟询问锁...... Tell 没有。

在其他一些系统中,持久层本身可以充当事件总线。Greg Young 的 EventStore 就是其中之一……您完全不用担心您的应用程序)。

这是理想主义理论与现实世界实施和框架限制的挑战。

在一个理想主义的世界中,聚合根不关心存储它引发的事件......它的目的(有人会说)是从提供的事件集合中水合自己,引发事件并让其他组件管理广播、持久性和查询(用于补水)这些事件。我知道 Akka.net 为其参与者提供了一个很好的数据库访问/持久层......这很好......但确实偏离了纯 SOLID CQRS 实现。因此,您可以开始在您的实施中搅浑水,这就是您所在的位置。

这远非理想,但一种选择

    protected virtual void Execute<TCommand>(Action<TCommand> action, TCommand command)
    {
        UnitOfWork.Add(this);
        try
        {
            action(command);

            UnitOfWork.Commit();

            Sender.Tell(true, Self);
        }
        catch(Exception exception)
        {
            Logger.LogError("Executing an Akka.net request failed.", exception: exception);
            Sender.Tell(false, Self);
            throw;
        }
    }

然后可以通过

    Receive<SayHelloWorldCommand>(command => Execute(SayHello, command));

这取自 cqrs.net 在聚合中使用工作单元 (UOW),而不是将其传递回处理程序。这是他们做出的妥协,以至少将上述问题直接排除在聚合之外,并保持某种形式的 SOLID 实现。

于 2017-02-14T05:16:01.460 回答
1

@Beachwalker 我在https://github.com/thangchung/magazine-website-akka有使用 CQRS 方法的实现。不知道能不能帮到你?但希望你会发现一些有用的东西。我将继续为它添加更多功能和特性。

于 2017-03-08T15:48:06.273 回答