3

我目前正在从我的 MSMQ 中读取数据(为简洁起见):

public void Start()
{
    this.queue.ReceiveCompleted += this.ReceiveCompleted;
    this.queue.BeginReceive();
}

void ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
    this.queue.EndReceive(e.AsyncResult);

    try
    {
        var m = e.Message;
        m.Formatter = this.formatter;

        this.Handle(m.Body);
    }
    finally
    {
        this.queue.BeginReceive();
    }
}

但是,这只允许我串行处理消息。如何修改此代码以允许并行消息处理?

我知道我可以将其this.queue.BeginReceive();移出finally并移入顶部,ReceiveCompleted但是如何阻止产生与我有消息一样多的线程?如何明智地控制并行度,以免线程池泛滥?是否有一些内置机制,或者我必须编写自己的经理?

编辑:我的目标是更快地处理消息。消息的处理涉及对第 3 方的异步调用,因此目前我的实现正在浪费大量时间来通过队列。

谢谢

4

2 回答 2

4

我认为只托管更多队列阅读器实例会更简单。然后,您可以根据需要通过部署/取消部署更多实例来快速扩展和缩减。

它也成为管理问题,而不是发展问题,这就是扩展应该是的。

于 2013-01-10T11:29:39.007 回答
0

您可以使用“生产者消费者模式”...

.NET 4 及更高版本具有Concurrent线程安全的集合,并且实现了“大部分无锁”(因此在多线程中表现良好)......

您可以BlockingCollection与 TPL 结合使用来实现您想要的,而不必担心线程池饥饿或类似情况......您只需将线路更改为类似的this.Handle(m.Body); 东西MyBlockingCollection.Add(m.Body);并启动“消费者线程”,这些线程可以工作MyBlockingCollection并执行实际工作(即this.Handle调用例如MyBlockingCollection,他们通过调用获得的下一个项目TryTake)...有关基本示例,请参见上面的链接...

于 2013-01-10T11:20:20.330 回答