1

我有一个适用于队列通道的 dsl 流。但是,当我使用 Rendezvous 通道使其同步时,我得到的确认速率最多为 30 条消息/秒。我的处理程序只需 350 微秒即可完成该过程,但确认率一直很低。这大大增加了兔子队列。我什至将并发消费者扩展到 10 个并增加了预取,但这并没有帮助。然后我自己添加了几个更缩放的实例,但这有助于将 ack rate 提高到 45/sec 左右。

我怎样才能让流确认更快?我预计每秒超过 500 个的速率。

DSL流:

SimpleMessageListenerContainer simpleMessageListenerContainer = profileTagRabbitMLCConfig.transactedChannelSpanRabbitSMLC(queueName)

simpleMessageListenerContainer?.setConcurrentConsumers(concurrentConsumer)
            simpleMessageListenerContainer?.setPrefetchCount(prefetch)

            return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer))
                    .channel(rendezvousTransformerChannel1())
                    .transform(myTransformer, 'transform', { e -> e.advice(adviceWithRecoverer) })
                    .channel(rendezvousTransformerChannel2())
                    .handle(myHandler, 'save', { e -> e.advice(adviceWithRecoverer) })
                    .get()

同步频道:

@Bean
MessageChannel rendezvousTransformerChannel1() {
    return MessageChannels.rendezvous().get()
}

@Bean
MessageChannel rendezvousHandlerChannel() {
    return MessageChannels.rendezvous().get()
}

容器:

SimpleMessageListenerContainer 
transactedChannelSpanRabbitSMLC(CachingConnectionFactory rabbitConnectionFactory, String queueName){

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer()
    container.setConnectionFactory(rabbitConnectionFactory)
    container.setQueueNames(queueName)
    container.setChannelTransacted(true)
    container
}

重试恢复建议:

Advice getRetryAdviceWithRecovery() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice()
    advice.setRetryTemplate(getRetryTemplate())
    advice.recoveryCallback = getRecoveryCallback() // sends message to rabbit exchange
    advice
}

轮询器:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(100).maxMessagesPerPoll(500L).get();
    }
4

1 回答 1

0

您的用例是什么促使您使用RendezvousChannels ?

这非常罕见,而且我认为我从未在同一流程中看到过 2。

您必须有一个用于此通道类型的轮询器;我没有看到它们,所以这意味着你有一个默认的 poller bean。

你需要展示你的投票器,但我怀疑它没有很好地调整这个。ARendezvousChannel send()阻塞,直到某事执行 a receive()

无论如何,如果您在侦听器容器线程上使用任何类型的线程切换 ( QueueChannel, ),您将面临消息丢失的风险。RendezvousChannel

您可能应该只.channel()从您的流程中删除那些将使用DirectChannels 的 s 。

如果您想要并发,请使用concurrentConsumers侦听器容器上的属性。

container.setChannelTransacted(true)

如果您在 中发布消息myHandler,则事务也非常昂贵。

于 2018-03-04T02:13:39.450 回答