在我的应用程序中,我必须监听多个不同的队列并反序列化/调度在队列中收到的传入消息。
实际上,为了实现这一点,我正在做的是每个 QueueConnector 对象在构造时创建一个新线程,该线程执行一个无限循环,并阻塞调用 queue.Receive() 以接收队列中的下一条消息,如下面的代码所示:
// Instantiate message pump thread
msmqPumpThread = new Thread(() => while (true)
{
// Blocking call (infinite timeout)
// Wait for a new message to come in queue and get it
var message = queue.Receive();
// Deserialize/Dispatch message
DeserializeAndDispatchMessage(message);
}).Start();
我想知道是否可以使用 Task(s) 替换此“消息泵”,而不是通过新线程上的无限循环。
我已经为消息接收部分做了一个任务(见下文),但我真的不知道如何将它用于消息泵(我可以一遍又一遍地回忆完成相同的任务,继续,替换无限循环如上面代码中的单独线程?)
Task<Message> GetMessageFromQueueAsync()
{
var tcs = new TaskCompletionSource<Message>();
ReceiveCompletedEventHandler receiveCompletedHandler = null;
receiveCompletedHandler = (s, e) =>
{
queue.ReceiveCompleted -= receiveCompletedHandler;
tcs.SetResult(e.Message);
};
queue.BeginReceive();
return tcs.Task;
}
在这种情况下,通过在单独的线程(使用阻塞调用 => 阻塞线程)中使用任务而不是无限循环,我会获得什么吗?如果是的话,如何正确地做到这一点?
请注意,此应用程序没有很多 QueueConnector 对象,也没有(可能最多 10 个连接器),这意味着通过第一个解决方案最多有 10 个线程,因此内存占用/性能启动线程在这里不是问题。我宁愿考虑调度性能/ CPU 使用率。会有什么不同吗?