1

在 Lagom 中,当命令处理程序必须执行一些异步操作时,您会怎么做?例如:

override def behavior = Actions().onCommand[MyCommand, Done] {
  case (cmd, ctx, state) =>
    // some complex code that performs asynchronous operations
    // (for example, querying other persistent entities or the read-side
    // by making calls that return Future[...] and composing those),
    // as summarized in a placeholder below:
    val events: Future[Seq[Event]] = ???
    events map {
      xs => ctx.thenPersistAll(xs: _*) { () => ctx.reply(Done) }
    }
}

这样的代码的问题是编译器期望命令处理程序返回Persist,而不是Future[Persist]

这是故意这样做的,以确保事件以正确的顺序保存(即,先前命令生成的事件必须在后面命令生成的事件之前保存)?但是,这不能通过对事件偏移量的适当管理来处理,以便日志总是正确地对它们进行排序,而不管它们何时实际保存?

当命令处理复杂到需要从命令处理程序进行异步调用时,在这种情况下该怎么办?

4

2 回答 2

3

邮件列表上有一个类似的问题,James 给出了答案。 https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!topic/lagom-framework/Z6lynjNTqgE

简而言之,您在 CQRS 应用程序中的实体是一个一致性边界,并且应该只依赖于它在其中立即可用的数据,而不是外部(不调用外部服务)。

您可能正在寻找所谓的Command Enrichment. 您收到一个请求,从外部服务收集一些数据并构建一个包含您需要发送到您的实体的所有内容的命令。

您当然不应该查询读取端以在写入端实体内做出业务决策。您也不应该对来自其他实体的数据做出业务决策。

您的实体应该能够做出所有决定,因为它是您模型的一致性边界。

于 2017-06-06T19:33:18.103 回答
1

在这些情况下,我一直在做的是将 PersistentEntityRef 传递给异步操作,以便它可以向实体及其那些命令处理程序(不是产生异步计算的处理程序)发出命令,然后持久化事件。

请记住,这些都不是原子的,因此您必须考虑如果异步操作在发出命令的中途失败或某些命令成功而某些命令失败等会发生什么。大概您需要一些系统重试机制失败。如果您将命令处理程序构建为幂等的,它将帮助您处理重复项。

于 2017-06-07T08:25:05.847 回答