2

我有一个 Spring Integration 项目,我想通过多个操作同时处理一条消息。所以我设置了一个publish-subscribe-channel和一个task-executor。但是我想在继续之前等待所有处理完成。我该怎么做?

<publish-subscribe-channel id="myPubSub" task-executor="my10ThreadPool"/>

<channel id="myOutputChannel"/>

<service-activator input-channel="myPubSub" output-channel="myOutputChannel"
         ref="beanA" method="blah"/>
<service-activator input-channel="myPubSub" output-channel="myOutputChannel"
         ref="beanB" method="blah"/>

<service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>

因此,在上述情况下,我希望afterThreadingProcessor在两者之后只调用一次mybeanAbeanB完成它们的工作。但是,在上面afterThreadingProcessor会被调用两次。

4

2 回答 2

4
  1. 添加apply-sequence="true"到 pub-sub 通道(这会将默认相关数据添加到消息中,包括correlationIdsequenceSize和 ,sequenceNumber并允许在下游组件上使用默认策略)。

  2. 添加一个<aggregator/>before并将两个safterThreadingProcessor的输出路由到它。<service-activator/>

  3. 在聚合器之后添加一个<splitter/>- 默认拆分器会将聚合器生成的集合拆分为两条消息。

afterThreadingProcessor将为完成其工​​作的第二个线程上的每条消息调用一次。

您可以通过使用链来简化配置...

<chain input-channel="myOutputChannel">
    <aggregator />
    <splitter />
    <service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>
</chain>

要对最终服务进行一次调用,只需将您的服务更改为采用 aCollection<?>而不是添加拆分器。

编辑:

为了在评论#3中做你想做的事情(在原始线程上运行最终服务),这应该可以工作......

<int:channel id="foo" />
<int:service-activator ref="twoServicesGateway" input-channel="foo"
      output-channel="myOutputChannel" />

<int:gateway id="twoServicesGateway" default-request-channel="myPubSub"/>
    <int:publish-subscribe-channel id="myPubSub" task-executor="my10ThreadPool"
            apply-sequence="true"/>
    <int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
         ref="beanA" method="blah"/>
    <int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
         ref="beanB" method="blah"/>
    <int:aggregator input-channel="aggregatorChannel" />

<int:service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>

在这种情况下,网关封装了另外两个服务和聚合器;默认service-interface是简单的RequestReplyExchanger. 调用线程将等待输出。由于聚合器没有output-channel,框架会将回复发送到网关,等待线程接收到,返回,<service-activator/>然后将结果发送到最终服务。

您可能希望reply-timeout在网关上放置 a,因为默认情况下,它将无限期等待,并且如果其中一项服务返回 null,则不会收到任何聚合响应。

请注意,我缩进了网关流只是为了显示它从网关运行,它们不是网关的子元素。

于 2013-02-27T15:03:50.947 回答
0

现在可以使用Spring Integration 4.1.0中引入的更简洁的方法来实现相同的行为,作为 EIP Scatter-Gather模式的实现。

Checkout Scatter-Gather 示例要点: https ://gist.github.com/noorulhaq/b13a19b9054941985109

于 2015-03-07T13:50:42.157 回答