1

Commanded.ProcessManagers.ProcessManager在使用事件溯源/CQRS 框架 Commanded 中的模块实现 saga 模式时,我遇到了一个问题。

在发票上下文中,我需要为发票实施批量创建机制。这种大规模的创造既是一个聚合体,也是一个传奇。聚合允许开始和完成批量创建。saga 通过发出命令来创建发票并将它们的 ID 保持在 saga 状态来对“mass creation started”事件做出反应。之后,saga 通过监听它命令存在的发票实例的成功或失败事件来跟踪发票创建的状态。一旦每个发票实例报告成功或失败,saga 应该发出命令停止大规模创建。

为此,跟踪每个发票实例及其当前状态会很有帮助in progresscreatedfailed。我尝试在apply回调中实现这一点,原则上效果很好。

现在的问题是,apply回调总是在handle回调之后调用。因此 saga 状态会在 saga 应该做出反应之后更新。这似乎违反直觉,因此,handle回调中可用的状态不能用于正确反应。

在我看来,saga 模式在很多方面都是聚合模式的反转。虽然首先将命令处理成域事件然后将此域事件应用于状态以在聚合的情况下很有用,但我认为在 saga 的情况下,域事件是已经发生的事情的文档, 应该在尝试对其做出反应之前应用于状态。

现在我的问题是:有没有办法先配置 Commandedapplyhandle配置Commanded.ProcessManagers.ProcessManager模块?或者这实际上是一个错误并且需要一般修复?

4

1 回答 1

1

之后调用apply/2回调handle/2是设计使然,不可能将 Commanded 配置为不同的行为。

我同意您的推理,即在尝试处理事件以生成任何命令之前将事件应用于进程管理器的状态更有意义。这似乎是对 Commanded 进行的一项有价值的更改,可以通过您已经提出的问题 ( #176 ) 进行跟踪。

同时,您可以按如下方式实现您的流程管理器(saga):

defmodule InvoicingProcessManager do
  use Commanded.ProcessManagers.ProcessManager,
    name: __MODULE__,
    router: InvoicingRouter

  defstruct [
    :batch_uuid,
    pending_invoice_ids: MapSet.new()
  ]

  def interested?(%InvoiceBatchStarted{batch_uuid: batch_uuid}), do: {:start, batch_uuid}
  def interested?(%InvoiceCreated{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
  def interested?(%InvoiceFailed{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
  def interested?(%InvoiceBatchStopped{batch_uuid: batch_uuid}), do: {:stop, batch_uuid}
  def interested?(_event), do: false

  # Event handlers

  def handle(%InvoicingSaga{}, %InvoiceBatchStarted{} = started) do
    %InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started

    Enum.map(invoice_ids, fn invoice_id ->
      %CreateInvoice{
        invoice_id: invoice_id,
        batch_uuid: batch_uuid
      }
    end)
  end

  def handle(%InvoicingSaga{}, %InvoiceCreated{invoice_id: invoice_id}),
    do: attempt_stop_batch(pm, invoice_id)

  def handle(%InvoicingSaga{}, %InvoiceFailed{invoice_id: invoice_id}),
    do: attempt_stop_batch(pm, invoice_id)

  ## State mutators

  def apply(%InvoicingSaga{} = pm, %InvoiceBatchStarted{} = started) do
    %InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started

    %InvoicingSaga{
      transfer
      | batch_uuid: batch_uuid,
        pending_invoice_ids: MapSet.new(invoice_ids)
    }
  end

  def apply(%InvoicingSaga{} = pm, %InvoiceCreated{invoice_id: invoice_id}) do
    %InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
  end

  def apply(%InvoicingSaga{} = pm, %InvoiceFailed{invoice_id: invoice_id}) do
    %InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
  end

  ## Private helpers

  def attempt_stop_batch(%InvoicingSaga{batch_uuid: batch_uuid} = pm, invoice_id) do
    pending_invoices = invoice_completed(pm, invoice_id)

    case empty?(pending_invoices) do
      true -> %StopInvoiceBatch{batch_uuid: batch_uuid}
      false -> []
    end
  end

  defp invoice_completed(%InvoicingSaga{pending_invoice_ids: pending_invoice_ids}, invoice_id) do
    MapSet.delete(pending_invoice_ids, invoice_id)
  end

  defp empty?(map_set, empty \\ MapSet.new())
  defp empty?(%MapSet{} = empty, %MapSet{} = empty), do: true
  defp empty?(%MapSet{}, %MapSet{}), do: false
end
于 2018-06-21T11:54:26.573 回答