我的 Spring Integration 应用程序使用来自 RabbitMQ 的消息将它们转换为 SOAP 消息并执行 Web 服务请求。
每秒可以从队列中获取许多 (10 – 50) 条消息。或者在应用程序重新启动后,RabbitMQ 队列中可能有数千条消息。
在并行线程中处理多达 10 条消息的最佳可能方案是什么(消息排序很好但不是必需的功能,如果 Web 服务以业务失败回答,那么失败的消息应该重试直到成功)。
Amqp 侦听器不应从队列中消耗更多消息,因为任务执行器中没有可用的繁忙线程。我可以在这样的通道中定义一个 ThreadExecutor:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue);
}
IntegrationFlow integrationFlow = IntegrationFlows
.from(amqpInboundChannelAdapter)
.channel(c -> c.executor(exportFlowsExecutor))
.transform(businessObjectToSoapRequestTransformer)
.handle(webServiceOutboundGatewayFactory.getObject())
.get();
或者像这样在 AmqpInboundChannelAdapter 中定义一个任务执行器并且不在流定义中定义通道任务执行器就足够了:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.taskExecutor(taskExecutor));
}
或者可能为类似于选项 1 的通道定义任务执行器,但另外在通道适配器上设置 maxConcurrentConsumers,如下所示:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.maxConcurrentConsumers(10));
}