41

我想了解基于 CQRS 的系统中命令处理程序、聚合、存储库和事件存储之间关系的一些细节。

到目前为止我所理解的:

  • 命令处理程序从总线接收命令。他们负责从存储库中加载适当的聚合并调用聚合上的域逻辑。完成后,他们从总线上删除命令。
  • 聚合提供行为和内部状态。国家从不公开。改变状态的唯一方法是使用行为。建模此行为的方法从命令的属性创建事件,并将这些事件应用到聚合,然后调用相应地设置内部状态的事件处理程序。
  • 存储库仅允许在给定 ID 上加载聚合,并添加新聚合。基本上,存储库将域连接到事件存储。
  • 最后但并非最不重要的一点是,事件存储负责将事件存储到数据库(或使用的任何存储),并将这些事件重新加载为所谓的事件流。

到现在为止还挺好。现在有一些我还没有得到的问题:

  • 如果命令处理程序要在现有聚合上调用行为,那么一切都很容易。命令处理程序获取对存储库的引用,调用其 loadById 方法并返回聚合。但是,当还没有聚合但应该创建聚合时,命令处理程序会做什么?据我了解,以后应该使用这些事件重建聚合。这意味着聚合的创建是为了响应 fooCreated 事件而完成的。但是为了能够存储任何事件(包括 fooCreated 事件),我需要一个聚合。所以这在我看来就像一个先有鸡还是先有蛋的问题:没有事件我无法创建聚合,但应该创建事件的唯一组件是聚合。所以基本上它归结为:我如何创建新的聚合,谁做什么?
  • 当聚合触发事件时,内部事件处理程序会响应它(通常通过应用方法调用)并更改聚合的状态。这个事件是如何移交给存储库的?谁发起了“请将新事件发送到存储库/事件存储”操作?聚合体本身?通过观察聚合存储库?订阅内部事件的其他人?...?
  • 最后但并非最不重要的一点是,我在正确理解事件流的概念时遇到了问题:在我的想象中,它就像一个有序的事件列表。重要的是它是“有序的”。这是正确的吗?
4

3 回答 3

41

以下内容基于我自己的经验以及我对 Lokad.CQRS、NCQRS 等各种框架的实验。我确信有多种方法可以处理这个问题。我会发布对我最有意义的内容。

1.聚合创建:

每次命令处理程序需要聚合时,它都会使用存储库。存储库从事件存储中检索相应的事件列表并调用重载的构造函数,注入事件

var stream = eventStore.LoadStream(id)
var User = new User(stream)

如果聚合之前不存在,则流将为空,并且新创建的对象将处于其原始状态。您可能希望确保在此状态下只允许少数命令使聚合生效,例如User.Create().

2. 新事件的存储

命令处理发生在工作单元内。在命令执行期间,每个结果事件都将添加到聚合 ( User.Changes) 内的列表中。执行完成后,更改将附加到事件存储中。在下面的示例中,这发生在以下行中:

store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

3. 事件顺序

想象一下,如果两个后续CustomerMoved事件以错误的顺序重播,会发生什么。

一个例子

我将尝试用一段伪代码来说明(我故意将存储库问题留在命令处理程序中以显示幕后会发生什么):

申请服务:

UserCommandHandler
    Handle(CreateUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName, ...)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

总计的:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        isNewEvent = false
        foreach (event in eventStream)
            this.Apply(event, isNewEvent)

    Create(userName, ...)
        if (this.created) throw "User already exists"
        isNewEvent = true
        this.Apply(new UserCreated(...), isNewEvent)

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        isNewEvent = true
        this.Apply(new UserBlocked(...), isNewEvent)

    Apply(userCreatedEvent, isNewEvent)
        this.created = true
        if (isNewEvent) this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent, isNewEvent)
        this.blocked = true
        if (isNewEvent) this.Changes.Add(userBlockedEvent)

更新:

作为旁注:Yves 的回答让我想起了几年前Udi Dahan的一篇有趣的文章:

于 2012-09-11T08:04:44.807 回答
12

丹尼斯优秀答案的一个小变化:

  • 在处理“创造性”用例(即应该衍生出新的聚合)时,尝试找到另一个可以将责任转移到的聚合或工厂。这与拥有一个将事件进行水合的 ctor (或任何其他为此再水合的机制)并不冲突。有时工厂只是一个静态方法(适用于“上下文”/“意图”捕获),有时它是另一个聚合的实例方法(“数据”继承的好地方),有时它是一个显式工厂对象(“复杂”的创建逻辑)。
  • 我喜欢在我的聚合上提供一个显式的 GetChanges() 方法,该方法将内部列表作为数组返回。如果我的聚合在一次执行后仍保留在内存中,我还添加了一个 AcceptChanges() 方法来指示应清除内部列表(通常在将事物刷新到事件存储后调用)。您可以在此处使用基于拉(GetChanges/Changes)或推送(想想 .net 事件或 IObservable)的模型。很大程度上取决于事务语义、技术、需求等......
  • 您的事件流是一个链接列表。每个版本(事件/变更集)都指向前一个版本(也就是父版本)。您的事件流是发生在特定聚合上的一系列事件/更改。顺序仅在聚合边界内得到保证。
于 2012-09-11T08:26:17.117 回答
1

几乎同意 yves-reynhout 和 dennis-traub 的观点,但我想向您展示我是如何做到这一点的。我想剥夺我的聚合体将事件应用到自己身上或重新补充自己的责任;否则会有很多代码重复:每个聚合构造函数看起来都一样:

UserAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


OrderAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


ProfileAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)

这些责任可以留给指挥调度员。该命令由聚合直接处理。

Command dispatcher class

    dispatchCommand(command) method:
        newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
        EventDispatcher.dispatchEvents(newEvents)

    tryToDispatchCommand(command) method:
        aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
        aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
        newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
        AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)

ConcurentProofFunctionCaller class

    executeFunctionUntilSucceeds(pureFunction) method:
        do this n times
            try
                call result=pureFunction()
                return result
            catch(ConcurentWriteException)
                continue
        throw TooManyRetries    

AggregateRepository class

     loadAggregate(aggregateClass, aggregateId) method:
         aggregate = new aggregateClass
         priorEvents = EventStore.loadEvents()
         this.applyEventsOnAggregate(aggregate, priorEvents)

     saveAggregate(aggregateId, aggregate, newEvents)
        this.applyEventsOnAggregate(aggregate, newEvents)
        EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)

SomeAggregate class
    handleCommand1(command1) method:
        return new SomeEvent or throw someException BUT don't change state!
    applySomeEvent(SomeEvent) method:
        changeStateSomehow() and not throw any exception and don't return anything!

请记住,这是从 PHP 应用程序投射的伪代码;真正的代码应该在其他类中注入东西并重构其他职责。想法是保持聚合尽可能干净并避免代码重复。

关于聚合的一些重要方面:

  1. 命令处理程序不应更改状态;他们产生事件或抛出异常
  2. event apply 不应该抛出任何异常,也不应该返回任何东西;他们只改变内部状态

可以在这里找到一个开源的 PHP 实现。

于 2016-09-23T07:28:26.787 回答