6

我已将 Spring DefaultMessageListenerContainer 配置为 ActiveMQ 消费者,从队列中消费消息。我们称它为“Test.Queue” 我将此代码部署在 4 台不同的机器上,并且所有机器都配置到同一个 ActiveMQ 实例以处理来自同一个“Test.Queue”队列的消息。

一旦所有 4 台机器都启动并运行,我将最大消费者大小设置为 20,我看到队列中的消费者数量为 80(4 * 最大消费者大小 = 80)

当生成并发送到队列的消息变高时,一切都很好。

当有 1000 条消息并且在 80 个消费者中,假设其中一个被卡住了,它会冻结 Active MQ 以停止向其他消费者发送消息。

所有消息都永远卡在 ActiveMQ 中。

由于我有 4 台机器,最多有 80 个消费者,我不知道哪个消费者没有确认。

我停止并重新启动所有 4 台机器,当我停止有坏消费者卡住的机器时,消息再次开始流动。

我不知道如何配置 DefaultMessageListenerContainer 以放弃不良消费者并立即向 ActiveMQ 发出信号以开始发送消息。

即使没有 Spring,我也能够创建场景,如下所示:

  1. 我产生了多达 5000 条消息并将它们发送到“Test.Queue”队列
  2. 我创建了 2 个消费者(消费者 A,B),在一个消费者 B 的 onMessage() 方法中,我让线程长时间休眠(Thread.sleep(Long.MAX_VALUE)),其条件类似于当前时间 % 13 时0 然后让线程休眠。

  3. 跑这两个消费者。

  4. 去Active MQ,发现队列有2个消费者。
  5. A 和 B 都在处理消息
  6. 在某个时间点,消费者 B 的 onMessage() 被调用,当当前时间 % 13 为 0 的条件得到满足时,它使线程进入睡眠状态。
  7. 消费者 B 卡住了,无法向代理确认
  8. 我回到 Active MQ Web 控制台,仍然看到消费者为 2,但没有消息出列。
  9. 现在我创建了另一个消费者 C 并运行它来消费。
  10. 只有 ActiveMQ 中的消费者数量从 2 上升到 3。
  11. 但是消费者 C 没有消费任何东西,因为代理未能发送任何包含它们的消息,因为它仍在等待消费者 B 确认它。
  12. 我还注意到消费者 A 没有消费任何东西
  13. 我去杀死消费者 B ,现在所有消息都已耗尽。

假设 A、B、C 由 Spring 的 DefaultMessageListenerContainer 管理,我如何调整 Spring DefaultMessageListenerContainer 以在 X 秒内未能确认后立即将不良消费者(在我的情况下为消费者 B)从池中取出,立即确认代理代理不会永远持有消息。

谢谢你的时间。

感谢我是否能解决这个问题。

4

1 回答 1

3

这里有几个选项可以尝试...

  1. 将队列预取设置为 0 以促进更好地分布在消费者之间并减少特定消费者的“卡住”消息。见http://activemq.apache.org/what-is-the-prefetch-limit-for.html

  2. 在连接上设置 "?useKeepAlive=false&wireFormat.maxInactivityDuration=20000" 以在指定的非活动时间后使慢速消费者超时

  3. 设置队列策略“slowConsumerStrategy->abortSlowConsumer”...再次超时慢消费者

    <policyEntry ...
      ...
      <slowConsumerStrategy>
          <abortSlowConsumerStrategy />
      </slowConsumerStrategy>
      ...
    </policyEntry> 
    
于 2012-08-21T16:01:49.880 回答