我正在尝试使用 Amqp.InboundAdapter 从 RabbitMQ 队列中读取数据,对其进行处理,然后将其推送到另一个队列中。
所以流程看起来像这样:
Amqp.InboundAdapter --> 服务激活器 --> Amqp.outboundAdapter
但问题是处理速度非常非常慢。rabbitmq 控制台的传输速率是 0.2/sec 或 1sec,非常慢。
什么可能导致这种缓慢?
IntegrationFlows.from(
Amqp.inboundAdapter(connectionFactory, IncomingQueue)
.configureContainer(c -> {
c.concurrentConsumers(classicFlatFileConcurrentConsumers);
c.prefetchCount(incomingPrefetchCount);
c.adviceChain(new Advice[]{retryOperationsInterceptor});
c.channelTransacted(false);
})
)
.handle(myTransformer)
.split()
.filter(ResultDTO.class, dto -> dto.getID() != null)
.handle(Amqp.outboundAdapter(rabbitTemplate).routingKey(OutputQueueName))
.get();
请注意,我在 Amqp.OutboundAdapter 中使用 rabbitTemplate,它的 usePublisherConnection 设置为 True。
当我不使用 spring 集成时,rabbitmq 控制台中的消息速率大约为 1000 msg/sec。