我是spring cloud的新手,希望将我们的mono结构更改为微服务,这首先说明了我现在想要做的事情如下
- 接收请求以从不同来源调用 Web 服务(外部系统)。在任何特定时间,这可以是 1 个请求或最多 100K 个请求。
- 外部系统支持批量,所以如果我可以汇总消息并批量发送它们会更好。例如,继续聚合直到达到数量阈值(100 条消息)或达到时间阈值 2 秒。
- 另外,如果我收到错误,我想以指数方式退后
我的第一个想法是在执行上述聚合的 Sink 之前创建一个处理器。
这是云计算中的正确思维方式还是他们的另一条路要走?
工作解决方案
@EnableBinding(Processor.class)
class Configuration {
@Autowired
Processor processor;
@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
//AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
}