我对如何使用 Akka.net 应用 CQRS 和事件溯源感兴趣。我已经注意到提供 ES 部分的Akka.Persistence 。
据我了解,命令处理程序和 AggergateRoot 可以由单个类中的 ReceivePersistentActor 表示。我有一个基类(这里不完整)......
public abstract class AggregateRoot<TState> : ReceivePersistentActor
{
private readonly Func<TState, bool> shouldSafeSnapShot;
protected AggregateRoot(TState state, Func<TState, bool> shouldSafeSnapShot)
{
if (state == null) throw new ArgumentNullException(nameof(state));
if (shouldSafeSnapShot == null) throw new ArgumentNullException(nameof(shouldSafeSnapShot));
var path = this.Self.Path;
this.PersistenceId = $"{path.Parent.Name}/{path.Name}";
this.State = state;
this.shouldSafeSnapShot = shouldSafeSnapShot;
}
public sealed override string PersistenceId { get; }
protected TState State { get; }
protected void Emit<TEvent>(TEvent e, Action<TEvent> apply = null)
{
this.Persist(e,
@event =>
{
// update state
apply?.Invoke(@event);
// safe snapshot or so...
this.SaveSnapshotIfRequired();
// publish to event bus?
Context.System.EventStream.Publish(e);
});
}
private void SaveSnapshotIfRequired()
{
var state = this.State;
if (this.shouldSafeSnapShot(state))
{
this.SaveSnapshot(state);
}
}
}
我不想通过 EventBus 发送事件,Akka 似乎在它的包中带来了一些可能适合其他 CQRS 框架(NCqrs 中的 EventBus 抽象,Inceptum中的EventBus抽象)或 Gregory Young的简单示例的方式。
// publish to event bus?
Context.System.EventStream.Publish(e);
但是,在我看来,知道 AggregateRoot 域对象参与者中的事件总线的实现很奇怪。我可以将责任交给演员孩子或注入类似IAbstractEventBus接口之类的东西,用于切换实现,但我认为 AggregateRoot 不应该负责在持久化后发布事件并通知所有订阅者,对吧!?但是 ES 部分似乎与 Actor 耦合。
使用 Akka.net 和 Akka.Persistence 的典型方法是什么?任何设计想法如何拆分?
我偶然发现了 Akka.net 的IEventAdapter接口,但我不确定这是否能让我走上正确的道路(或黑暗的一面)......
public class SomeEventAdapter : IEventAdapter
{
public object ToJournal(object evt)
{
return evt; // should I add the event emitter here?
// but is this already stored now? hmmm
}
}