我正在使用当前的 Apache.NMS 1.7.1 和 Apache.NMS.ActiveMQ 1.7.2。我正在使用IndividualAcknowledge
,所以我试图将加载的消息的数量保持在很低的水平,因为如果我在没有 Acking 的情况下加载了 >>1000 条消息,它会变得非常慢(它每次都在搜索所有消息的链接列表)。
我有以下代码片段:
BlockingCollection<IMessage> _collection = new BlockingCollection<IMessage>();
var factory = new ConnectionFactory("activemq:tcp://localhost:61616");
var _connection = (Connection) factory.CreateConnection();
_connection.PrefetchPolicy.All = 1000;
var session = (Session) _connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
var destination = SessionUtil.GetDestination(session, "queue://testQueue");
var messageConsumer = (MessageConsumer)session.CreateConsumer(destination);
messageConsumer.Listener += message => _collection.Add(message);
_connection.Start();
该队列testQueue
包含 >>20_000 条消息。等待几秒钟后,_collection
包含所有消息,而我没有确认任何消息。
如果我理解正确的文档,我应该最多得到 1000 个,直到我开始承认它们。
一旦代理向消费者发送了预取限制数量的消息,它就不会再向该消费者发送任何消息,直到消费者确认了它收到的至少 50% 的预取消息,例如 prefetch/2。当代理收到所述确认后,它将向消费者发送更多预取/2 条消息,以“充值”,就像它的预取缓冲区一样。
我还尝试了一些变体,例如仅QueuePrefetch
在 url 中设置或设置策略:
activemq:tcp://localhost:61616?nms.prefetchPolicy.queuePrefetch=100
或在队列中:
queue://testQueue?consumer.prefetchSize=100
关于 的缓慢IndividualAcknowledge
,我已经尝试了其他几个选项,但运气不佳:
messageConsumer.OptimizeAcknowledge = true;
messageConsumer.OptimizeAcknowledgeTimeOut = 1000;
messageConsumer.OptimizedAckScheduledAckInterval = 500;
虽然我并不完全清楚最后一个选项的区别。