1

我有一个MessageReceiver正在从队列中抽取消息:

var factory = MessagingFactory.CreateFromConnectionString(connectionString);

var receiver = factory.CreateMessageReceiver(queuePath);

receiver.OnMessageAsync(HandleBrokeredMessageAsync);

HandleBrokeredMessageAsync是我的代表,接收者将消息注入其中。

当我调用Close()接收器时,它将停止从队列中抽取更多消息。为了避免潜在的竞争条件,我想确保在返回控制权之前所有待处理的处理都已完成。

我考虑过将每个调用跟踪HandleBrokeredMessageAsync到 aConcurrentBag<T>中,并在它们完成时将它们从包中取出。我会使用 aBlockingCollection<T>来阻止进程,直到排水完成,但不清楚何时调用CompleteAdding():我会在调用后调用它,但调用和随后传递给处理程序的消息Close()之间是否存在间隙?Close()

receiver.Close();
pendingMessages.CompleteAdding(); 
// Can additional messages be pumped after this?
4

1 回答 1

0

看看这里的示例,因为它使用 ManualResetEvent 来协调 Run() 方法的关闭和 Close 操作。类似的方法可能适用于您在消息处理中检查并在那里停止(不接受下一条消息进行处理),然后在所有这些并发处理器完成时调用 close?

https://stackoverflow.com/a/16739691/1078351

于 2013-10-28T17:59:03.977 回答