我已将 Spring DefaultMessageListenerContainer 配置为 ActiveMQ 消费者,从队列中消费消息。我们称它为“Test.Queue” 我将此代码部署在 4 台不同的机器上,并且所有机器都配置到同一个 ActiveMQ 实例以处理来自同一个“Test.Queue”队列的消息。
一旦所有 4 台机器都启动并运行,我将最大消费者大小设置为 20,我看到队列中的消费者数量为 80(4 * 最大消费者大小 = 80)
当生成并发送到队列的消息变高时,一切都很好。
当有 1000 条消息并且在 80 个消费者中,假设其中一个被卡住了,它会冻结 Active MQ 以停止向其他消费者发送消息。
所有消息都永远卡在 ActiveMQ 中。
由于我有 4 台机器,最多有 80 个消费者,我不知道哪个消费者没有确认。
我停止并重新启动所有 4 台机器,当我停止有坏消费者卡住的机器时,消息再次开始流动。
我不知道如何配置 DefaultMessageListenerContainer 以放弃不良消费者并立即向 ActiveMQ 发出信号以开始发送消息。
即使没有 Spring,我也能够创建场景,如下所示:
- 我产生了多达 5000 条消息并将它们发送到“Test.Queue”队列
我创建了 2 个消费者(消费者 A,B),在一个消费者 B 的 onMessage() 方法中,我让线程长时间休眠(Thread.sleep(Long.MAX_VALUE)),其条件类似于当前时间 % 13 时0 然后让线程休眠。
跑这两个消费者。
- 去Active MQ,发现队列有2个消费者。
- A 和 B 都在处理消息
- 在某个时间点,消费者 B 的 onMessage() 被调用,当当前时间 % 13 为 0 的条件得到满足时,它使线程进入睡眠状态。
- 消费者 B 卡住了,无法向代理确认
- 我回到 Active MQ Web 控制台,仍然看到消费者为 2,但没有消息出列。
- 现在我创建了另一个消费者 C 并运行它来消费。
- 只有 ActiveMQ 中的消费者数量从 2 上升到 3。
- 但是消费者 C 没有消费任何东西,因为代理未能发送任何包含它们的消息,因为它仍在等待消费者 B 确认它。
- 我还注意到消费者 A 没有消费任何东西
- 我去杀死消费者 B ,现在所有消息都已耗尽。
假设 A、B、C 由 Spring 的 DefaultMessageListenerContainer 管理,我如何调整 Spring DefaultMessageListenerContainer 以在 X 秒内未能确认后立即将不良消费者(在我的情况下为消费者 B)从池中取出,立即确认代理代理不会永远持有消息。
谢谢你的时间。
感谢我是否能解决这个问题。