我有一个生成工作的进程和一个BlockingCollection<>
消耗该工作的第二个进程。当我关闭我的程序时,我需要我的消费者停止消费工作,但我仍然需要快速记录正在等待但尚未消费的工作。
现在,我的消费者产生了一个有foreach (<object> in BlockingCollection.GetConsumingEnumerable())
循环的线程。当我停止我的程序时,我的制作人会调用Consumer.BlockingCollection.CompleteAdding()
. 我发现我的消费者继续处理队列中的所有内容。
谷歌搜索的问题告诉我,我需要使用CancellationToken
. 所以我试了一下:
private void Process () { // This method runs in a separate thread
try {
foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
// Consume
}
}
catch (OperationCancelledException) {
foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
// quickly log
}
}
}
我的制片人有:
private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();
当我尝试这个时,没有任何迹象表明 OperationCancelledException 曾经发生过。
这个问题试图解释取消令牌的使用,但似乎没有正确使用它。(争论:如果它有效,那么它“足够正确”。)这个问题似乎与我的问题完全相同,但没有例子。(这里也一样。)
所以重申一下:如何正确使用CancellationToken
onBlockingCollection.GetConsumingEnumerable()
并注意在使用其他方法取消队列后需要处理队列中的剩余项目?
(我认为我的问题集中在正确使用 CancellationToken 上。我的测试都没有表明该过程实际上被取消了。(StopFlag.IsCancellationRequested
总是等于false
。))