我收到一些关于 MSMQ 的错误,并且在网上找不到任何可以帮助我的东西。这是堆栈跟踪:
System.Messaging.MessageQueueException (0x80004005): An invalid handle was passed to the function.
at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
at System.Messaging.MessageQueue.Peek(TimeSpan timeout, Cursor cursor, PeekAction action)
at Utility.Msmq.MsmqManager.PeekWithoutTimeout(Cursor cursor, PeekAction action)
at Utility.Msmq.MsmqManager.SearchMessages(List`1 labels)
使用以下代码初始化队列:
var queuePath = isIp
? (machineName == "127.0.0.1"
? string.Format(@".\Private$\{0}", queueName)
: string.Format("FormatName:Direct=OS:{0}\\private$\\{1}", machineName,
queueName))
: string.Format("FormatName:Direct=TCP:{0}\\private$\\{1}", machineName, queueName);
try
{
_queue = MessageQueue.Exists(queuePath)
? new MessageQueue(queuePath)
: MessageQueue.Create(queuePath, true);
_queue.MessageReadPropertyFilter.Priority = true;
_queue.SetPermissions("Everyone", MessageQueueAccessRights.FullControl);
}
catch(MessageQueueException)
{
// Concurrent threads on a new company will try to create the queue at once, which throws an error
// If this happens, wait 100ms, then check if queue was created by some other thread, and return it.
while (!MessageQueue.Exists(queuePath))
{
Thread.Sleep(100);
}
_queue = new MessageQueue(queuePath);
}
这是在 msmq 上执行 peek 的代码
private Message PeekWithoutTimeout(Cursor cursor, PeekAction action)
{
Message ret = null;
try
{
ret = _queue.Peek(new TimeSpan(1), cursor, action);
}
catch (MessageQueueException mqe)
{
if (mqe.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
{
throw;
}
}
catch (Exception generalException)
{
Loggers.CreateBillLogger.Error(generalException.Message, generalException);
throw generalException;
}
return ret;
}
public List<Message> SearchMessages(List<string> labels)
{
var count = 0;
var cursor = _queue.CreateCursor();
var messages = new List<Message>();
var message = PeekWithoutTimeout(cursor, PeekAction.Current);
if (message != null)
{
if (labels.Contains(message.Label))
{
messages.Add(message);
}
do
{
message = PeekWithoutTimeout(cursor, PeekAction.Next);
if (message != null && labels.Contains(message.Label))
{
messages.Add(message);
}
} while (message != null);
}
return messages;
}
PS 我应该注意的一点是,这段代码在多线程环境中运行,使用 BackgroundWorkers,并且两个或多个线程尝试一次创建队列时发生很大变化,导致并发创建。这个问题在 catch 分支中得到了解决。
有什么我做错了吗?谢谢 !