每秒只有 20 条消息!这就是我的全部!这是从队列中查看 50 条消息并使用 ReceiveById 并行接收它们的代码。队列中的消息总数为 500。我也测试了其他数字。但最高限制是每秒 20 条消息!我是在某个完全不碍事的地方吗?
编辑1:
1 - 我需要队列是可恢复的。但有趣的是,即使我将可恢复选项设置为 false;仍然最高限制是 20 条消息/秒。
2 - 我被迫在这里使用 MSMQ,因为涉及到一些遗留应用程序。但如果这段代码是正确的,并且前 20 名的限制确实存在,我可以说服小组切换。因此,任何关于替换 MSMQ 的建议(基于实际经验)都非常受欢迎(请注意,我们需要保留我们的消息,以防出现任何类型的故障)。
3 - 我已将 ThreadPool 中的线程数设置为较高的数字,以防万一,但实际上在此代码中,它将导致创建 100 - 200 个线程。我测试了从 50 到 10000 的不同数字,没有任何区别。
4 - 在每个任务中创建一个新的 MessageQueue,因为 ReceiveById 不是线程安全的。
5 - 正如我们在代码中看到的,消息大小非常小;它只是一个字符串加一个整数。
编辑2:[非常奇怪的新结果]
我玩过这段代码的每一部分,发现了这一点:如果我注释掉singleLocal.UseJournalQueue = false; 在我的任务中,我每秒最多可以阅读 1200 条消息。不令人印象深刻,但在我的情况下可以接受。奇怪的是 UseJournalQueue 的默认值是 false;为什么再次将其设置为 false 会在性能上产生如此大的差异?
static partial class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
{
var q = MessageQueue.Create(qName);
}
var single = new MessageQueue(qName);
single.UseJournalQueue = false;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
{
var data = new Data { Name = string.Format("name_{0}", i), Value = i };
single.Send(new Message(data));
}
watch.Stop();
Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
while (Interlocked.Read(ref __counter) < count)
{
var list = new List<Message>();
var peekCount = 50;
while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
{
try
{
list.Add(enu.Current);
peekCount--;
}
catch (Exception ex2)
{
Trace.WriteLine(ex2.ToString());
break;
}
}
var tlist = new List<Task>();
foreach (var message in list)
{
var stupid_closure = message;
var t = new Task(() =>
{
using (var singleLocal = new MessageQueue(qName))
{
singleLocal.UseJournalQueue = false;
singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
singleLocal.DefaultPropertiesToSend.Recoverable = true;
singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
try
{
// processing the message and insert it into database
// workflow completed here, so we can safely remove the message from queue
var localM = singleLocal.ReceiveById(stupid_closure.Id);
var localSample = (Data)localM.Body;
Interlocked.Increment(ref __counter);
Console.WriteLine(Interlocked.Read(ref __counter));
}
catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
}
}, TaskCreationOptions.PreferFairness);
tlist.Add(t);
}
foreach (var t in tlist) t.Start();
Task.WaitAll(tlist.ToArray());
list.Clear();
}
watch.Stop();
Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
}
static long __counter = 0;
}