5

我在 .Net 程序中使用 ActiveMQ,并且充斥着消息事件。

简而言之,当我收到一个队列事件“onMessage(IMessage receivedMsg)”时,我将消息放入一个内部队列中,X 线程在其中执行它们的操作。

起初我在创建会话时有:'AcknowledgementMode.AutoAcknowledge',所以我猜测队列中的所有消息都被吸入并放入内存队列(这是有风险的,因为崩溃,一切都丢失了)。

所以我在创建会话时使用了:'AcknowledgementMode.ClientAcknowledge',当工作人员准备好消息时,它调用消息上的'commit()'方法。但是,仍然所有消息都从队列中被吸走。

我如何配置它只处理 X 数量的消息或在内部队列中,而不是立即“下载”所有内容?

4

2 回答 2

3

您使用的是 .NET 4.0 吗?您可以使用 BlockingCollection 。将其设置为它可能包含的最大数量。一旦一个线程试图放入一个多余的元素,添加操作就会阻塞,直到集合再次低于阈值。

也许这会起到节流作用?

Rx框架中还有一个用于节流的API,但我不知道它是如何实现的。如果您将 Queue 源实现为 Observable,则此 API 将为您提供,但我不知道这是否符合您的需求。

于 2010-11-25T13:51:21.750 回答
2

您可以设置客户端预取来控制客户端将发送多少条消息。当 Session 处于 Auto Ack 时,客户端只会在通过 onMessage 回调或通过同步接收将消息传递到您的应用程序后才确认消息。默认情况下,客户端将从代理预取 1000 条消息,如果客户端出现故障,这些消息将重新传递给另一个客户端,这是一个队列,否则对于一个主题,它们将被丢弃,因为主题是基于广播的频道。如果您将预取设置为一个,那么您的客户端只会从服务器发送一条消息,然后每次您的 onMessage 回调完成时,都会发送一条新消息,因为客户端会确认该消息,即如果会话处于自动确认状态模式。

有关所有选项,请参阅 NMS 配置页面:http: //activemq.apache.org/nms/configuring.html

问候

蒂姆。FuseSource.com

于 2010-11-25T23:33:55.377 回答