1

我目前正在使用 NMS 开发基于应用程序的 ActiveMQ(5.6)。

我们有几个消费者(exe)试图从同一个队列(不是主题)接收massgaes。虽然所有消息都只发送给一个消费者,但我已经让消费者在收到消息后睡了几秒钟。顺便说一句,我们不希望消费者收到其他消费者收到的相同消息。

官网中提到,我们应该设置 Prefetch Limit 来决定在任何时间点可以将多少条消息流式传输给消费者。它既可以配置也可以编码。

我尝试的一种方法是使用 PrefetchPolicy 类绑定 ConnectionFactory 类,如下所示。

PrefetchPolicy poli = new PrefetchPolicy();
poli.QueuePrefetch = 0;
ConnectionFactory fac = new ConnectionFactory("activemq:tcp://Localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
fac.PrefetchPolicy = poli;
using (IConnection con = fac.CreateConnection())
{
   using (ISession se = con.CreateSession())
   {
       IDestination destination = SessionUtil.GetDestination(se, queue, DestinationType.Queue);
       using (IMessageConsumer consumer = se.CreateConsumer(queue1))
       {
          con.Start();
          while (true)
          {
              ITextMessage message = consumer.Receive() as ITextMessage;
              Thread.Sleep(2000);
              if (message != null)
              {
                Task.Factory.StartNew(() => extractAndSend(message.Text));   //do something
              }
              else
              {
                Console.WriteLine("No message received~");
              }
        }
       }
   }
}

但无论预取值是多少,我设置的消费者行为都与以前一样。

我已经尝试了第二种方法来获得结果,即配置服务器conf文件。我像下面这样更改服务器的 activemq.xml。" producerFlowControl="true" memoryLimit="5mb" /> " producerFlowControl="true" memoryLimit="5mb"> 但是虽然我设置了 dispatchpolicy,但消息仍然发送给一个消费者。

我想知道:是否可以通过仅配置服务器 xml 文件以使所有消费者从一个队列接收消息来实现此行为?如果是这样,如何配置它以及我的配置有什么问题?如果没有,我该如何使用代码来实现目标?谢谢。

4

1 回答 1

2

看看“消息组”功能。

我有同样的问题。只有一个消费者处理了所有消息。我在我的代码中发现我在发送期间使用了组标头:

request.Properties["NMSXGroupID"] = "cheese";

根据官方文档:

标准 JMS 头 JMSXGroupID 用于定义消息所属的消息组。然后,消息组功能确保同一消息组的所有消息都将发送到同一个 JMS 使用者——同时该使用者保持活动状态。一旦消费者死亡,将选择另一个。

请参阅http://activemq.apache.org/message-groups.html的完整详细信息

于 2014-02-26T16:12:08.163 回答