5

我有一个远程服务,当发生特定事件时,我正在调用它来加载产品的定价数据。一旦加载,产品定价就会被广播给另一个消费者以在其他地方处理。

我不想在每个事件上调用远程服务,而是将事件分成小组,然后一次性发送。

我根据聚合器拼凑了以下模式。虽然它有效,但很多“闻起来”——尤其是我的SimpleCollatingAggregator. 我是 Spring Integration 和一般 EIP 的新手,我怀疑我在滥用组件。

编码

我的代码通过调用下面的方法在代码的其他地方触发@Gateway

public interface ProductPricingGateway {    
    @Gateway(requestChannel="product.pricing.outbound.requests")
    public void broadcastPricing(ProductIdentifer productIdentifier);
}

然后将其连接到聚合器,如下所示:

<int:channel id="product.pricing.outbound.requests" />
<int:channel id="product.pricing.outbound.requests.batch" />
<int:aggregator input-channel="product.pricing.outbound.requests"
output-channel="product.pricing.outbound.requests.batch" release-strategy="releaseStrategy"
    ref="collatingAggregator" method="collate"
    correlation-strategy-expression="0"
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true"/>  
<bean id="collatingAggregator" class="com.mangofactory.pricing.SimpleCollatingAggregator" />
<bean id="releaseStrategy" class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    <!-- Release when: 10 Messages ... or ... -->
    <constructor-arg index="0" value="10" />
    <!-- ... 5 seconds since first request -->
    <constructor-arg index="1" value="5000" />
</bean>

这是聚合器的实现:

public class SimpleCollatingAggregator {

    public List<?> collate(List<?> input)
    {
        return input;
    }

}

最后,这被消耗在以下内容上@ServiceActivator

@ServiceActivator(inputChannel="product.pricing.outbound.requests.batch")
public void fetchPricing(List<ProductIdentifer> identifiers)
{
        // omitted
}

注意:在实践中,我也使用@Async,以使调用代码尽可能快地返回。我对此也有很多问题,我将转到一个单独的问题。

问题 1: 考虑到我想要实现的目标,聚合器模式在这里是一个合适的选择吗?这感觉就像很多样板——有更好的方法吗?

问题 2: 我使用固定的排序规则值0, 来有效地说:“无论您如何对这些消息进行分组,都可以随它们来。”

这是实现这一目标的适当方法吗?

问题3: SimpleCollatingAggregator对我来说简直是错误的。

我希望它接收我的个人入站ProductIdentifier对象,并将它们分组,然后传递它们。这行得通,但它合适吗?有没有更好的方法来实现同样的目标?

4

1 回答 1

3

Q1:是的,但请参阅 Q3 和下面的进一步讨论。
Q2:这是说“不需要相关性”的正确方式(但你需要expire-groups-on-completion你拥有的 )。
Q3:这种情况下,不需要自定义Aggregator,直接使用默认即可(去掉ref和method属性)。

请注意,聚合器是被动组件;释放是由新消息的到达触发的;因此,您的发布策略的第二部分只会在新消息到达时启动(它不会在 5 秒后自发释放组)。

但是,您可以MessageGroupStoreReaper为此目的配置一个:http: //static.springsource.org/spring-integration/reference/html/messaging-routing-chapter.html#aggregator

于 2013-04-04T16:14:27.003 回答