0

我是spring cloud的新手,希望将我们的mono结构更改为微服务,这首先说明了我现在想要做的事情如下

  1. 接收请求以从不同来源调用 Web 服务(外部系统)。在任何特定时间,这可以是 1 个请求或最多 100K 个请求。
  2. 外部系统支持批量,所以如果我可以汇总消息并批量发送它们会更好。例如,继续聚合直到达到数量阈值(100 条消息)或达到时间阈值 2 秒。
  3. 另外,如果我收到错误,我想以指数方式退后

我的第一个想法是在执行上述聚合的 Sink 之前创建一个处理器。

这是云计算中的正确思维方式还是他们的另一条路要走?


工作解决方案

@EnableBinding(Processor.class)
class Configuration {

    @Autowired
    Processor processor;


    @ServiceActivator(inputChannel = Processor.INPUT)
    @Bean
    public MessageHandler aggregator() {

        AggregatingMessageHandler aggregatingMessageHandler =
                new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                        new SimpleMessageStore(10));

        //AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        //aggregatorFactoryBean.setMessageStore();
        aggregatingMessageHandler.setOutputChannel(processor.output());
        //aggregatorFactoryBean.setDiscardChannel(processor.output());
        aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
        aggregatingMessageHandler.setSendTimeout(1000L);
        aggregatingMessageHandler.setCorrelationStrategy(new  ExpressionEvaluatingCorrelationStrategy("'FOO'"));
        aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
        aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
        aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
        aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
        return aggregatingMessageHandler;
    }
}
4

1 回答 1

0

您可以编写一个aggregator处理器应用程序,将多条消息组合成一条消息。有关 Spring Integration 聚合器的更多信息,请参阅此处

于 2016-11-24T09:37:12.167 回答