我正在使用 NServiceBus 构建一个处理管道,但我在配置分发服务器时遇到了问题,以便使流程中的每个步骤都具有可扩展性。这里有一些信息:
- 管道将有一个主进程,对 WorkItem 说“好的,该开始了”,然后它将启动一个类似流程图的进程。
- 流程图中的每个步骤都可能在计算上很昂贵,因此我希望能够扩展每个步骤。这告诉我每个步骤都需要一个 Distributor。
- 我希望以后能够将其他活动与事件挂钩。这告诉我完成后我需要 Publish() 消息,而不是 Send() 它们。
- 流程可能需要根据条件进行分支。这告诉我一个进程必须能够发布不止一种类型的消息。
- 一个进程可能需要加入分叉。我想我应该为此使用 Sagas。
希望这些假设是好的,否则我遇到的麻烦比我想象的要多。
为了简单起见,让我们忘记分叉或加入,考虑一个简单的管道,步骤 A 后跟步骤 B,并以步骤 C 结束。每个步骤都有自己的分发器,并且可以有许多节点处理消息。
- NodeA worker 包含一个 IHandleMessages 处理器,并发布 EventA
- NodeB worker 包含一个 IHandleMessages 处理器,并发布 Event B
- NodeC worker 包含一个 IHandleMessages 处理器,然后管道就完成了。
以下是配置文件的相关部分,其中#表示worker的数量,(即有输入队列NodeA.1和NodeA.2):
NodeA:
<MsmqTransportConfig InputQueue="NodeA.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeA.Distrib.Control" DistributorDataAddress="NodeA.Distrib.Data" >
<MessageEndpointMappings>
</MessageEndpointMappings>
</UnicastBusConfig>
NodeB:
<MsmqTransportConfig InputQueue="NodeB.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeB.Distrib.Control" DistributorDataAddress="NodeB.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventA, Messages" Endpoint="NodeA.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
NodeC:
<MsmqTransportConfig InputQueue="NodeC.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeC.Distrib.Control" DistributorDataAddress="NodeC.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventB, Messages" Endpoint="NodeB.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
以下是分销商配置的相关部分:
Distributor A:
<add key="DataInputQueue" value="NodeA.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeA.Distrib.Control"/>
<add key="StorageQueue" value="NodeA.Distrib.Storage"/>
Distributor B:
<add key="DataInputQueue" value="NodeB.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeB.Distrib.Control"/>
<add key="StorageQueue" value="NodeB.Distrib.Storage"/>
Distributor C:
<add key="DataInputQueue" value="NodeC.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeC.Distrib.Control"/>
<add key="StorageQueue" value="NodeC.Distrib.Storage"/>
我正在使用每个节点的 2 个实例进行测试,问题似乎出现在节点 B 的中间。基本上可能发生两件事:
- Node B 的两个实例都报告它正在订阅 EventA,并且 NodeC.Distrib.Data@MYCOMPUTER 正在订阅 Node B 发布的 EventB。在这种情况下,一切都很好。
- Node B 的两个实例都报告它正在订阅 EventA,但是,一个工作人员说 NodeC.Distrib.Data@MYCOMPUTER 正在订阅 TWICE,而另一个工作人员没有提到它。
在第二种情况下,它似乎只受分发者路由订阅消息的方式控制,如果“overachiever”节点处理一个 EventA,那么一切都很好。如果“成绩不佳者”处理 EventA,则 EventB 的发布没有订阅者并且工作流终止。
所以,我的问题:
- 这种设置可以吗?
- 配置是否正确?除了简单的一级发布者/2-worker 设置之外,很难找到任何关于分发者的配置示例。
- 让一个中央代理进程来执行所有非计算密集型的交通警察操作,并且仅在任务长时间运行且必须进行负载平衡时才将消息发送到分发器后面的进程是否更有意义?
- 然后负载平衡的节点可以简单地回复中央代理,这似乎更容易。
- 另一方面,这似乎与 NServiceBus 的权力下放不符。
- 如果这是答案,并且长时间运行的流程的 done 事件是一个回复,那么您如何保留 Publish 以启用以后对已发布事件的可扩展性?