添加apply-sequence="true"
到 pub-sub 通道(这会将默认相关数据添加到消息中,包括correlationId
、sequenceSize
和 ,sequenceNumber
并允许在下游组件上使用默认策略)。
添加一个<aggregator/>
before并将两个safterThreadingProcessor
的输出路由到它。<service-activator/>
在聚合器之后添加一个<splitter/>
- 默认拆分器会将聚合器生成的集合拆分为两条消息。
afterThreadingProcessor
将为完成其工作的第二个线程上的每条消息调用一次。
您可以通过使用链来简化配置...
<chain input-channel="myOutputChannel">
<aggregator />
<splitter />
<service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>
</chain>
要对最终服务进行一次调用,只需将您的服务更改为采用 aCollection<?>
而不是添加拆分器。
编辑:
为了在评论#3中做你想做的事情(在原始线程上运行最终服务),这应该可以工作......
<int:channel id="foo" />
<int:service-activator ref="twoServicesGateway" input-channel="foo"
output-channel="myOutputChannel" />
<int:gateway id="twoServicesGateway" default-request-channel="myPubSub"/>
<int:publish-subscribe-channel id="myPubSub" task-executor="my10ThreadPool"
apply-sequence="true"/>
<int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
ref="beanA" method="blah"/>
<int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
ref="beanB" method="blah"/>
<int:aggregator input-channel="aggregatorChannel" />
<int:service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>
在这种情况下,网关封装了另外两个服务和聚合器;默认service-interface
是简单的RequestReplyExchanger
. 调用线程将等待输出。由于聚合器没有output-channel
,框架会将回复发送到网关,等待线程接收到,返回,<service-activator/>
然后将结果发送到最终服务。
您可能希望reply-timeout
在网关上放置 a,因为默认情况下,它将无限期等待,并且如果其中一项服务返回 null,则不会收到任何聚合响应。
请注意,我缩进了网关流只是为了显示它从网关运行,它们不是网关的子元素。