我已经编写了一个事件源聚合,现在实现了一个事件源传奇......我注意到两者很相似,并创建了一个事件源对象作为两者派生的基类。
我在这里看过一个演示http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/但感觉可能存在问题,因为命令可能会丢失由于发送命令在写事务之外而导致进程崩溃?
public void Save(ISaga saga)
{
var events = saga.GetUncommittedEvents();
eventStore.Write(new UncommittedEventStream
{
Id = saga.Id,
Type = saga.GetType(),
Events = events,
ExpectedVersion = saga.Version - events.Count
});
foreach (var message in saga.GetUndispatchedMessages())
bus.Send(message); // can be done in different ways
saga.ClearUncommittedEvents();
saga.ClearUndispatchedMessages();
}
相反,我使用的是 Greg Young 的 EventStore,当我保存 EventSourcedObject(聚合或 saga)时,顺序如下:
- Repository 获取新的 MutatingEvents 列表。
- 将它们写入流。
- 当流被写入并提交到流时,EventStore 会触发新事件。
- 我们监听来自 EventStore 的事件并在 EventHandlers 中处理它们。
我正在实现一个传奇的两个方面:
- 接收事件,这些事件可能会转换状态,而状态又可能会发出命令。
- 发出警报,在将来的某个时间点(通过外部计时器服务)我们可以被回调)。
问题
据我了解,事件处理程序不应发出命令(如果命令失败会发生什么?) - 但我可以接受上述情况,因为 Saga 是通过此事件代理控制命令创建(对事件的反应)的实际事物,并且任何命令发送失败都可以在外部处理(在外部事件处理程序中
CommandEmittedFromSaga
,如果命令失败则处理并重新发送)?还是我忘记了包装事件并将本机存储
Commands
在Events
同一个流中(与基类 Message 混合 - Saga 会同时使用命令和事件,聚合只会使用事件)?网络上还有其他用于实现事件源 Sagas 的参考资料吗?有什么我可以理智地检查我的想法的吗?
一些背景代码如下。
Saga 发出运行命令(包装在 CommandEmittedFromSaga 事件中)
下面的命令包含在事件中:
public class CommandEmittedFromSaga : Event
{
public readonly Command Command;
public readonly Identity SagaIdentity;
public readonly Type SagaType;
public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
{
Command = command;
SagaType = sagaType;
SagaIdentity = sagaIdentity;
}
}
Saga 在未来某个时间点请求回调(AlarmRequestedBySaga 事件)
警报回调请求被包装在一个事件中,并将在请求的时间或之后向 Saga 回火和事件:
public class AlarmRequestedBySaga : Event
{
public readonly Event Event;
public readonly DateTime FireOn;
public readonly Identity Identity;
public readonly Type SagaType;
public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
{
Identity = identity;
SagaType = sagaType;
Event = @event;
FireOn = fireOn;
}
}
或者,我可以将命令和事件存储在同一基本类型消息流中
public abstract class EventSourcedSaga
{
protected EventSourcedSaga() { }
protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
{
Identity = id;
if (messages == null) throw new ArgumentNullException(nameof(messages));
var count = 0;
foreach (var message in messages)
{
var ev = message as Event;
var command = message as Command;
if(ev != null) Transition(ev);
else if(command != null) _messages.Add(command);
else throw new Exception($"Unsupported message type {message.GetType()}");
count++;
}
if (count == 0)
throw new ArgumentException("No messages provided");
// All we need to know is the original number of events this
// entity has had applied at time of construction.
_unmutatedVersion = count;
_constructing = false;
}
readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
readonly List<Message> _messages = new List<Message>();
readonly int _unmutatedVersion;
private readonly bool _constructing = true;
public readonly Identity Identity;
public IList<Message> GetMessages()
{
return _messages.ToArray();
}
public void Transition(Event e)
{
_messages.Add(e);
_dispatcher.Dispatch(this, e);
}
protected void SendCommand(Command c)
{
// Don't add a command whilst we are in the constructor. Message
// state transition during construction must not generate new
// commands, as those command will already be in the message list.
if (_constructing) return;
_messages.Add(c);
}
public int UnmutatedVersion() => _unmutatedVersion;
}