我目前正在使用 NMS 开发基于应用程序的 ActiveMQ(5.6)。
我们有几个消费者(exe)试图从同一个队列(不是主题)接收massgaes。虽然所有消息都只发送给一个消费者,但我已经让消费者在收到消息后睡了几秒钟。顺便说一句,我们不希望消费者收到其他消费者收到的相同消息。
官网中提到,我们应该设置 Prefetch Limit 来决定在任何时间点可以将多少条消息流式传输给消费者。它既可以配置也可以编码。
我尝试的一种方法是使用 PrefetchPolicy 类绑定 ConnectionFactory 类,如下所示。
PrefetchPolicy poli = new PrefetchPolicy();
poli.QueuePrefetch = 0;
ConnectionFactory fac = new ConnectionFactory("activemq:tcp://Localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
fac.PrefetchPolicy = poli;
using (IConnection con = fac.CreateConnection())
{
using (ISession se = con.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(se, queue, DestinationType.Queue);
using (IMessageConsumer consumer = se.CreateConsumer(queue1))
{
con.Start();
while (true)
{
ITextMessage message = consumer.Receive() as ITextMessage;
Thread.Sleep(2000);
if (message != null)
{
Task.Factory.StartNew(() => extractAndSend(message.Text)); //do something
}
else
{
Console.WriteLine("No message received~");
}
}
}
}
}
但无论预取值是多少,我设置的消费者行为都与以前一样。
我已经尝试了第二种方法来获得结果,即配置服务器conf文件。我像下面这样更改服务器的 activemq.xml。" producerFlowControl="true" memoryLimit="5mb" /> " producerFlowControl="true" memoryLimit="5mb"> 但是虽然我设置了 dispatchpolicy,但消息仍然发送给一个消费者。
我想知道:是否可以通过仅配置服务器 xml 文件以使所有消费者从一个队列接收消息来实现此行为?如果是这样,如何配置它以及我的配置有什么问题?如果没有,我该如何使用代码来实现目标?谢谢。