我有一个适用于队列通道的 dsl 流。但是,当我使用 Rendezvous 通道使其同步时,我得到的确认速率最多为 30 条消息/秒。我的处理程序只需 350 微秒即可完成该过程,但确认率一直很低。这大大增加了兔子队列。我什至将并发消费者扩展到 10 个并增加了预取,但这并没有帮助。然后我自己添加了几个更缩放的实例,但这有助于将 ack rate 提高到 45/sec 左右。
我怎样才能让流确认更快?我预计每秒超过 500 个的速率。
DSL流:
SimpleMessageListenerContainer simpleMessageListenerContainer = profileTagRabbitMLCConfig.transactedChannelSpanRabbitSMLC(queueName)
simpleMessageListenerContainer?.setConcurrentConsumers(concurrentConsumer)
simpleMessageListenerContainer?.setPrefetchCount(prefetch)
return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer))
.channel(rendezvousTransformerChannel1())
.transform(myTransformer, 'transform', { e -> e.advice(adviceWithRecoverer) })
.channel(rendezvousTransformerChannel2())
.handle(myHandler, 'save', { e -> e.advice(adviceWithRecoverer) })
.get()
同步频道:
@Bean
MessageChannel rendezvousTransformerChannel1() {
return MessageChannels.rendezvous().get()
}
@Bean
MessageChannel rendezvousHandlerChannel() {
return MessageChannels.rendezvous().get()
}
容器:
SimpleMessageListenerContainer
transactedChannelSpanRabbitSMLC(CachingConnectionFactory rabbitConnectionFactory, String queueName){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer()
container.setConnectionFactory(rabbitConnectionFactory)
container.setQueueNames(queueName)
container.setChannelTransacted(true)
container
}
重试恢复建议:
Advice getRetryAdviceWithRecovery() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice()
advice.setRetryTemplate(getRetryTemplate())
advice.recoveryCallback = getRecoveryCallback() // sends message to rabbit exchange
advice
}
轮询器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(100).maxMessagesPerPoll(500L).get();
}