以下内容基于我自己的经验以及我对 Lokad.CQRS、NCQRS 等各种框架的实验。我确信有多种方法可以处理这个问题。我会发布对我最有意义的内容。
1.聚合创建:
每次命令处理程序需要聚合时,它都会使用存储库。存储库从事件存储中检索相应的事件列表并调用重载的构造函数,注入事件
var stream = eventStore.LoadStream(id)
var User = new User(stream)
如果聚合之前不存在,则流将为空,并且新创建的对象将处于其原始状态。您可能希望确保在此状态下只允许少数命令使聚合生效,例如User.Create()
.
2. 新事件的存储
命令处理发生在工作单元内。在命令执行期间,每个结果事件都将添加到聚合 ( User.Changes
) 内的列表中。执行完成后,更改将附加到事件存储中。在下面的示例中,这发生在以下行中:
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
3. 事件顺序
想象一下,如果两个后续CustomerMoved
事件以错误的顺序重播,会发生什么。
一个例子
我将尝试用一段伪代码来说明(我故意将存储库问题留在命令处理程序中以显示幕后会发生什么):
申请服务:
UserCommandHandler
Handle(CreateUser cmd)
stream = store.LoadStream(cmd.UserId)
user = new User(stream.Events)
user.Create(cmd.UserName, ...)
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
Handle(BlockUser cmd)
stream = store.LoadStream(cmd.UserId)
user = new User(stream.Events)
user.Block(string reason)
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
总计的:
User
created = false
blocked = false
Changes = new List<Event>
ctor(eventStream)
isNewEvent = false
foreach (event in eventStream)
this.Apply(event, isNewEvent)
Create(userName, ...)
if (this.created) throw "User already exists"
isNewEvent = true
this.Apply(new UserCreated(...), isNewEvent)
Block(reason)
if (!this.created) throw "No such user"
if (this.blocked) throw "User is already blocked"
isNewEvent = true
this.Apply(new UserBlocked(...), isNewEvent)
Apply(userCreatedEvent, isNewEvent)
this.created = true
if (isNewEvent) this.Changes.Add(userCreatedEvent)
Apply(userBlockedEvent, isNewEvent)
this.blocked = true
if (isNewEvent) this.Changes.Add(userBlockedEvent)
更新:
作为旁注:Yves 的回答让我想起了几年前Udi Dahan的一篇有趣的文章: