1

我正在使用 spring-rabbit-1.7.3.RELEASE.jar

我在我的 xml 中使用 shutdownTimeout 参数定义了一个 SimpleMessageListenerContainer。

bean id="aContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
    <property name="queueNames" value="aQueue" />
    <property name="adviceChain" ref="retryAdvice" />
    <property name="acknowledgeMode" value="AUTO" />
    <property name="shutdownTimeout" value="900000" />
</bean>

当我的服务关闭并且“aQueue”中仍有消息时,我希望 shutdownTimeout 将允许消息得到处理。但这似乎不会发生。

经过进一步调查,我发现 SimpleMessageListenerContainer 中定义的 await() 方法总是返回 true。

this.cancellationLock.await(Long.valueOf(this.shutdownTimeout), TimeUnit.MILLISECONDS); 

我想了解 await 的逻辑是如何工作的,它是如何获取锁的,以及我需要什么额外的配置才能使代码正常工作。

4

1 回答 1

0

它即时等待消费者,那些忙于处理已获取但尚未确认的消息的消费者。在关机期间,没有人会从队列中轮询新消息。

ActiveObjectCounter 等待所有内部sCountDownLatch被释放。当我们:

public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

因此,这确实可能是一个事实,即您的所有消费者(private volatile int concurrentConsumers = 1;默认情况下)在此期间都被取消和释放shutdownTimeout

但同样:当状态为 时,没有人会从 Broker 轮询新消息shutdown

于 2018-01-18T19:19:29.390 回答