7

我有一个生成工作的进程和一个BlockingCollection<>消耗该工作的第二个进程。当我关闭我的程序时,我需要我的消费者停止消费工作,但我仍然需要快速记录正在等待但尚未消费的工作。

现在,我的消费者产生了一个有foreach (<object> in BlockingCollection.GetConsumingEnumerable())循环的线程。当我停止我的程序时,我的制作人会调用Consumer.BlockingCollection.CompleteAdding(). 我发现我的消费者继续处理队列中的所有内容。

谷歌搜索的问题告诉我,我需要使用CancellationToken. 所以我试了一下:

private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}

我的制片人有:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();

当我尝试这个时,没有任何迹象表明 OperationCancelledException 曾经发生过。

这个问题试图解释取消令牌的使用,但似乎没有正确使用它。(争论:如果它有效,那么它“足够正确”。)这个问题似乎与我的问题完全相同,但没有例子。(这里也一样。)

所以重申一下:如何正确使用CancellationTokenonBlockingCollection.GetConsumingEnumerable()并注意在使用其他方法取消队列后需要处理队列中的剩余项目?

(我认为我的问题集中在正确使用 CancellationToken 上。我的测试都没有表明该过程实际上被取消了。(StopFlag.IsCancellationRequested总是等于false。))

4

2 回答 2

5

当您传入时CancellationTokenGetConsumingEnumerable它不会抛出请求取消的异常,它只会停止吐出项目。与其捕获异常,不如检查令牌:

foreach (var item in BlockingCollection.
    GetConsumingEnumerable(CancellationToken))
{
    //consume item
}
if (CancellationToken.IsCancellationRequested)
    foreach (var item in BlockingCollection)
    {
        //log item
    }

另请注意,如果请求取消,并且可能CompletedAdding尚未调用,那么您应该只迭代集合,而不是调用GetConsumingEnumerable。如果您知道生产者将在取消操作时完成添加,那么这不是问题。

于 2013-11-11T17:40:10.763 回答
3

我的问题在于我如何尝试取消操作。我没有让生产者拥有 CancellationTokenSource,而是将其全部放在消费者中。

public class cProducer {
    private cConsumer myConsumer = new cConsumer ();

    public void onStart () {
        myConsumer.OnStart ();
    }

    public void onStop () {
        myConsumer.OnStop ();
    }

    public void OnOrderReceived (cOrder newOrder) {
        myConsumer.orderQueue.Add (cOrder);
    }
}

public class cConsumer {
    private CancellationTokenSource stopFlag;
    public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
    private Task processingTask;

    public void OnStart () {
        stopFlag = new CancellationTokenSource ();
        processingTask = Task.Factory.StartNew (() => Process ());
    }

    public void OnStop () {
        stopFlag.Cancel ();
        orderQueue.CompleteAdding ();
        processingTask.Wait ();
    }

    private void Process () {
        try {
            foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                // process
            }
        }
        catch (OperationCanceledException) {
            foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                // log it
            }
        }
    }
}
于 2013-11-11T18:14:00.250 回答