0

场景:我向名为 JUST.CN 的队列发送五万条消息。并且每 1000 条消息设置一个消息属性字符串“myfilter='abc'”。现在我创建具有相同选择器的消费者来消费消息。但是消费速度非常慢,尤其是在 30000 条消息之后。我无法更改 activeMQ 中的默认配置。核心代码如下:

 IDestination destination = SessionUtil.GetDestination(session, "JUST.CN");
                    IMessageProducer producer = session.CreateProducer(destination);
                    string msg = "Hello hello hello world!~~testing~Hello hello hello world!~~testing~";

                    for (int i = 0; i < 50000; i++)
                    {
                        ITextMessage message;

                        if (i % 1000 == 0)
                        {
                            message = session.CreateTextMessage(msg);
                            message.Properties.SetString("myfilter", "abc");
                        }
                        else
                        {
                            message = session.CreateTextMessage(msg);
                        }
                        producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
                    }

消费者代码:

 IDestination destination = SessionUtil.GetDestination(session, "JUST.CN");             
 IMessageConsumer consumer = session.CreateConsumer(destination, "myfilter='abc'", false);

                    int count = 0;
                    DateTime dtstart = DateTime.Now;
                 for (int i = 0; i < 50; i++)
                    {
                        IMessage iMsg = consumer.Receive();
                        ITextMessage msg = (ITextMessage)iMsg;
                        Console.WriteLine(msg.Text);
                        count++;

                    }
                    DateTime dtend = DateTime.Now;
                    TimeSpan time = dtend - dtstart;
                    Console.WriteLine(time);
                    Console.WriteLine(count);

我需要为 ActiveMQ 的选择器使用任何特殊设置吗?提前感谢您的任何意见。

4

1 回答 1

2

通常,将消息选择器与队列一起使用是一种反模式。几年前在Ade on Middleware的博客上有一篇很好的文章,说明了为什么这样做。

如果您正在考虑在队列上使用消息选择器,则该用例通常是某些消费者只对某些消息感兴趣。您可以通过使用在代理上配置的复合目标以更好的方式解决此用例,该复合目标应用过滤器(通过filteredDestination),该过滤器等效于选择逻辑:

<broker xmlns="http://activemq.apache.org/schema/core">
  <destinationInterceptors>
    <virtualDestinationInterceptor>
      <virtualDestinations>
        <compositeQueue name="myapp.in" forwardOnly="true">
          <forwardTo>
            <filteredDestination selector="myHeader > 5" queue="myapp.out.high"/>
            <filteredDestination selector="myHeader <= 5" queue="myapp.out.low"/>
          </forwardTo>
        </compositeQueue>
      </virtualDestinations>
    </virtualDestinationInterceptor>
  </destinationInterceptors>
</broker> 

这里发生的情况是,当消息到达myapp.in队列时,SQL92 过滤器针对消息运行,并且消息被适当地排序。只想消费高消息订阅的订阅者myapp.out.high

通过这样做,您可以有效地将问题颠倒过来,并消除在使用消息时进行复杂处理的需要。

于 2014-01-13T20:51:05.447 回答