0

我的 Spring Integration 应用程序使用来自 RabbitMQ 的消息将它们转换为 SOAP 消息并执行 Web 服务请求。

每秒可以从队列中获取许多 (10 – 50) 条消息。或者在应用程序重新启动后,RabbitMQ 队列中可能有数千条消息。

在并行线程中处理多达 10 条消息的最佳可能方案是什么(消息排序很好但不是必需的功能,如果 Web 服务以业务失败回答,那么失败的消息应该重试直到成功)。

Amqp 侦听器不应从队列中消耗更多消息,因为任务执行器中没有可用的繁忙线程。我可以在这样的通道中定义一个 ThreadExecutor:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue);
}

IntegrationFlow integrationFlow = IntegrationFlows
  .from(amqpInboundChannelAdapter)
  .channel(c -> c.executor(exportFlowsExecutor))
  .transform(businessObjectToSoapRequestTransformer)
  .handle(webServiceOutboundGatewayFactory.getObject())
  .get();

或者像这样在 AmqpInboundChannelAdapter 中定义一个任务执行器并且不在流定义中定义通道任务执行器就足够了:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.taskExecutor(taskExecutor));
}

或者可能为类似于选项 1 的通道定义任务执行器,但另外在通道适配器上设置 maxConcurrentConsumers,如下所示:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.maxConcurrentConsumers(10));
}
4

1 回答 1

2

在容器上配置 a并让所有下游进程发生在容器中的那些线程concurrency上的最佳实践。ListenerContainer这样,当由于线程繁忙而不再从队列中轮询消息时,您将获得自然的背压。另一方面,消息不会丢失,因为ExecutorChannel在监听器容器之后,您将释放一个轮询线程并且当前消息将被确认为已使用,但您可能会在下游失败。

于 2018-08-28T20:46:00.623 回答