2

我们有一个接收消息并由应用程序处理的 MSMQ 队列设置。我们想让另一个进程订阅队列,然后读取消息并记录它的内容。

我已经有了这个,问题是它一直在偷看队列。运行时服务器上的 CPU 约为 40%。mqsvc.exe 以 30% 运行,此应用以 10% 运行。我宁愿有一些东西只是等待消息进来,得到它的通知,然后记录它而不不断地轮询服务器。

    Dim lastid As String
    Dim objQueue As MessageQueue
    Dim strQueueName As String

    Public Sub Main()
        objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
        Dim propertyFilter As New MessagePropertyFilter
        propertyFilter.ArrivedTime = True
        propertyFilter.Body = True
        propertyFilter.Id = True
        propertyFilter.LookupId = True
        objQueue.MessageReadPropertyFilter = propertyFilter
        objQueue.Formatter = New ActiveXMessageFormatter
        AddHandler objQueue.PeekCompleted, AddressOf MessageFound

        objQueue.BeginPeek()
    end main

    Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)

        Dim oQueue As MessageQueue
        Dim oMessage As Message

        ' Retrieve the queue from which the message originated
        oQueue = CType(s, MessageQueue)

            oMessage = oQueue.EndPeek(args.AsyncResult)
            If oMessage.LookupId <> lastid Then
                ' Process the message here
                lastid = oMessage.LookupId
                ' let's write it out
                log.write(oMessage)
            End If

        objQueue.BeginPeek()
    End Sub
4

5 回答 5

4

您是否尝试过使用MSMQEvent.Arrived来跟踪消息?

当表示打开队列的 MSMQQueue 对象实例的 MSMQQueue.EnableNotification 方法已被调用并且消息找到或到达队列中的适用位置时,将触发 MSMQEvent 对象的 Arrived 事件。

于 2010-05-13T14:12:40.427 回答
3

peek 迭代之间的 Thread.Sleep(10) 可以为您节省大量周期。

我能想到的唯一其他选择是将日志记录构建到队列读取应用程序中。

于 2010-05-03T19:09:11.593 回答
1

没有任何 API 可以让您只查看每条消息一次。

问题是BeginPeek如果队列上已经有消息,它会立即执行它的回调。由于您没有删除消息(毕竟这是偷看MessageFound,而不是接收!),当您的回调再次开始偷看时,该过程重新开始,因此几乎不断运行。

您最好的选择是将消息记录在作者或阅读器中。日记将在短期内起作用(如果您只关心收到的消息),但不是长期解决方案:

虽然从配置了日志的队列中检索消息的性能开销仅比不使用日志检索消息多 20% 左右,但真正的成本是当未经检查的 MSMQ 服务内存不足或机器磁盘不足时导致的意外问题空间

于 2010-05-03T19:38:25.897 回答
0

这对我有用。它在等待消息时阻塞线程。每个循环周期都会检查类成员_bServiceRunning以查看线程是否应该中止。

    private void ProcessMessageQueue(MessageQueue taskQueue)
    {
        // Set the formatter to indicate body contains a binary message:
        taskQueue.Formatter = new BinaryMessageFormatter();

        // Specify to retrieve selected properties.
        MessagePropertyFilter myFilter = new MessagePropertyFilter();
        myFilter.SetAll();
        taskQueue.MessageReadPropertyFilter = myFilter;

        TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds

        // Monitor the MSMQ until the service is stopped:
        while (_bServiceRunning)
        {
            rxMessage = null;

            // Listen to the queue for the configured duration:
            try
            {
                // See if a message is available, and if so remove if from the queue if any required
                // web service is available:
                taskQueue.Peek(tsQueueReceiveTimeout);

                // If an IOTimeout was not thrown, there is a message in the queue
                // Get all the messages; this does not remove any messages
                Message[] arrMessages = taskQueue.GetAllMessages();

                // TODO: process the message objects here;
                //       they are copies of the messages in the queue
                //       Note that subsequent calls will return the same messages if they are
                //       still on the queue, so use some structure defined in an outer block
                //       to identify messages already processed.

            }
            catch (MessageQueueException mqe)
            {
                if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                {
                    // The peek message time-out has expired; there are no messages waiting in the queue
                    continue; // at "while (_bServiceRunning)"
                }
                else
                {
                    ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
                    break; // from "while (_bServiceRunning)"
                }
            }
            catch (Exception ex)
            {
                ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
                break; // from "while (_bServiceRunning)"
            }
        }

    } // ProcessMessageQueue()
于 2010-05-03T20:03:59.223 回答
0

恕我直言,您应该只在队列上打开日志。然后,您可以保证保留已提交到队列的所有消息的副本,而您费力地尝试建立自己的机制来记录所有消息时,情况并非如此。

如果您想要比队列本身更容易阅读的东西(我当然想要),那么按计划记录和删除日志消息会更容易和更可靠。然后,这个过程的运行速度有多快并不重要,您只需要获取一次消息,总的来说,这只是解决问题的更好方法。

于 2017-09-04T07:50:18.163 回答