0

我正在使用当前的 Apache.NMS 1.7.1 和 Apache.NMS.ActiveMQ 1.7.2。我正在使用IndividualAcknowledge,所以我试图将加载的消息的数量保持在很低的水平,因为如果我在没有 Acking 的情况下加载了 >>1000 条消息,它会变得非常慢(它每次都在搜索所有消息的链接列表)。

我有以下代码片段:

BlockingCollection<IMessage> _collection = new BlockingCollection<IMessage>();
var factory = new ConnectionFactory("activemq:tcp://localhost:61616");
var _connection = (Connection) factory.CreateConnection();
_connection.PrefetchPolicy.All = 1000;
var session = (Session) _connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
var destination = SessionUtil.GetDestination(session, "queue://testQueue");
var messageConsumer = (MessageConsumer)session.CreateConsumer(destination);
messageConsumer.Listener += message => _collection.Add(message);
_connection.Start();

该队列testQueue包含 >>20_000 条消息。等待几秒钟后,_collection包含所有消息,而我没有确认任何消息。

如果我理解正确的文档,我应该最多得到 1000 个,直到我开始承认它们。

一旦代理向消费者发送了预取限制数量的消息,它就不会再向该消费者发送任何消息,直到消费者确认了它收到的至少 50% 的预取消息,例如 prefetch/2。当代理收到所述确认后,它将向消费者发送更多预取/2 条消息,以“充值”,就像它的预取缓冲区一样。

我还尝试了一些变体,例如仅QueuePrefetch 在 url 中设置或设置策略

activemq:tcp://localhost:61616?nms.prefetchPolicy.queuePrefetch=100

在队列中

queue://testQueue?consumer.prefetchSize=100

关于 的缓慢IndividualAcknowledge,我已经尝试了其他几个选项,但运气不佳:

messageConsumer.OptimizeAcknowledge = true;
messageConsumer.OptimizeAcknowledgeTimeOut = 1000;
messageConsumer.OptimizedAckScheduledAckInterval = 500;

虽然我并不完全清楚最后一个选项的区别。

4

1 回答 1

1

因为您使用的是异步侦听器,所以当客户端在将每条消息传递给您的异步事件侦听器时继续向代理授予信用时,代理将向您发送所有内容。为了真正限制在任何给定时间传递给客户端的消息数量,客户端需要使用同步接收调用。单个确认最好与同步消费配对,这样您就可以控制读取多少消息并在准备好时在某个时间点确认它们。

优化的确认设置不适用于单独的确认模式,因此对性能没有帮助。

于 2017-04-19T22:06:02.930 回答