9

每秒只有 20 条消息!这就是我的全部!这是从队列中查看 50 条消息并使用 ReceiveById 并行接收它们的代码。队列中的消息总数为 500。我也测试了其他数字。但最高限制是每秒 20 条消息!我是在某个完全不碍事的地方吗?

编辑1:

1 - 我需要队列是可恢复的。但有趣的是,即使我将可恢复选项设置为 false;仍然最高限制是 20 条消息/秒。

2 - 我被迫在这里使用 MSMQ,因为涉及到一些遗留应用程序。但如果这段代码是正确的,并且前 20 名的限制确实存在,我可以说服小组切换。因此,任何关于替换 MSMQ 的建议(基于实际经验)都非常受欢迎(请注意,我们需要保留我们的消息,以防出现任何类型的故障)。

3 - 我已将 ThreadPool 中的线程数设置为较高的数字,以防万一,但实际上在此代码中,它将导致创建 100 - 200 个线程。我测试了从 50 到 10000 的不同数字,没有任何区别。

4 - 在每个任务中创建一个新的 MessageQueue,因为 ReceiveById 不是线程安全的。

5 - 正如我们在代码中看到的,消息大小非常小;它只是一个字符串加一个整数。

编辑2:[非常奇怪的新结果]

我玩过这段代码的每一部分,发现了这一点:如果我注释掉singleLocal.UseJournalQueue = false; 在我的任务中,我每秒最多可以阅读 1200 条消息。不令人印象深刻,但在我的情况下可以接受。奇怪的是 UseJournalQueue 的默认值是 false;为什么再次将其设置为 false 会在性能上产生如此大的差异?

static partial class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(15000, 30000);
        ThreadPool.SetMinThreads(10000, 20000);

        var qName = @".\private$\deep_den";

        if (!MessageQueue.Exists(qName))
        {
            var q = MessageQueue.Create(qName);
        }

        var single = new MessageQueue(qName);
        single.UseJournalQueue = false;
        single.DefaultPropertiesToSend.AttachSenderId = false;
        single.DefaultPropertiesToSend.Recoverable = true;
        single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

        var count = 500;
        var watch = new Stopwatch();

        watch.Start();
        for (int i = 0; i < count; i++)
        {
            var data = new Data { Name = string.Format("name_{0}", i), Value = i };

            single.Send(new Message(data));
        }
        watch.Stop();

        Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);

        var enu = single.GetMessageEnumerator2();

        watch.Reset();
        watch.Start();
        while (Interlocked.Read(ref __counter) < count)
        {
            var list = new List<Message>();
            var peekCount = 50;

            while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
            {
                try
                {
                    list.Add(enu.Current);
                    peekCount--;
                }
                catch (Exception ex2)
                {
                    Trace.WriteLine(ex2.ToString());
                    break;
                }
            }

            var tlist = new List<Task>();
            foreach (var message in list)
            {
                var stupid_closure = message;

                var t = new Task(() =>
                {
                    using (var singleLocal = new MessageQueue(qName))
                    {
                        singleLocal.UseJournalQueue = false;
                        singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
                        singleLocal.DefaultPropertiesToSend.Recoverable = true;
                        singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

                        try
                        {
                            // processing the message and insert it into database
                            // workflow completed here, so we can safely remove the message from queue

                            var localM = singleLocal.ReceiveById(stupid_closure.Id);
                            var localSample = (Data)localM.Body;

                            Interlocked.Increment(ref __counter);
                            Console.WriteLine(Interlocked.Read(ref __counter));
                        }
                        catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
                        catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
                    }
                }, TaskCreationOptions.PreferFairness);

                tlist.Add(t);
            }

            foreach (var t in tlist) t.Start();

            Task.WaitAll(tlist.ToArray());

            list.Clear();
        }
        watch.Stop();
        Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);

        Console.WriteLine("press any key to continue ...");
        Console.ReadKey();
    }
    static long __counter = 0;
}
4

3 回答 3

1

在进行基准测试时,将代码保持在最低限度很重要,以避免背景噪声干扰测试。

不幸的是,您的测试非常嘈杂,以至于很难找到导致延迟的确切原因

  • 不要使用线程。多线程很少有帮助,而且通常弊大于利。
  • 只测试一件事。当测试 ReceiveById 不要使用 GetMessageEnumerator2 时,它的成本很高,您需要在最后将其从结果中删除。
  • 只创建一次 MessageQueue 并重用它。我们只测试 ReceiveById 而不是创建新的 MessageQueue 类。

我重写了测试并收到了更好的结果 MSMQ 不是块上最快的队列,但它并不慢。

    var qName = @".\private$\deep_den";

    if (!MessageQueue.Exists(qName))
    {
        var q = MessageQueue.Create(qName);
    }

    var single = new MessageQueue(qName);
    single.UseJournalQueue = true;
    single.DefaultPropertiesToSend.AttachSenderId = false;
    single.DefaultPropertiesToSend.Recoverable = true;
    single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

    var count = 500;
    var watch = new Stopwatch();

    watch.Start();
    for (int i = 0; i < count; i++)
    {
        var data = new Data { Name = string.Format("name_{0}", i), Value = i };

        single.Send(new Message(data));
    }
    watch.Stop();

    Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);

    var enu = single.GetMessageEnumerator2();

    watch.Reset();
    watch.Start();

    var queue = new MessageQueue(qName);
    queue.UseJournalQueue = true;
    queue.DefaultPropertiesToSend.AttachSenderId = false;
    queue.DefaultPropertiesToSend.Recoverable = true;
    queue.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

    List<Data> lst = new List<Data>();
    while (lst.Count != count && enu.MoveNext(TimeSpan.FromDays(1)))
    {
        var message = queue.ReceiveById(enu.Current.Id);
        lst.Add((Data)message.Body);
    }
    watch.Stop();
    Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);

    Console.WriteLine("press any key to continue ...");
    Console.ReadKey();

于 2014-04-01T21:00:29.820 回答
1

Kaveh,您正在使用的 MessageQueue 对象的构造函数将 UseJournalQueue 属性设置为 true,以防启用 Message Queuing 对象的日志设置。不知何故,它认为 .\private$\deep_den 的日志设置已启用。编辑 - 您是否使用预先创建的队列?

于 2013-01-08T11:19:54.617 回答
0

Kaveh,我在这里可能完全错了,但我认为你的问题是 XML 序列化。一旦创建了 XmlSerializer,它仍然可能很慢,但构造函数确实需要时间。

我建议要么完全删除序列化并将数据作为字符串读取,要么事先创建一个 XmlSerializer 或 XmlMessageFormatter 并将其传递给线程。我会说要小心线程问题,但看起来你对此有很好的把握。

于 2013-01-29T12:58:34.857 回答