4

我正在尝试创建一个异步 ProducerConsumerCollection,为此,我正在使用这个 msdn 页面(http://msdn.microsoft.com/en-us/library/hh873173.aspx(页面底部))。

我现在正在尝试添加超时,这就是我所做的:

    public async Task<T> TakeWithTimeout(int timeout)
    {
            Task<T> takeTask = this.Take();

            if (timeout <= 0 || takeTask == await Task.WhenAny(this.tasks.Take(), Task.Delay(timeout)))
            {
                return await takeTask;
            }
            else
            {
                // Timeout
                return default(T);
            }
        }
    }

这段代码的问题在于,在超时的情况下,它不会取消由 Take() 方法创建的任务。

既然这个任务是由TaskCompletionSource“创建”的,我不能给它一个cancellationToken吗?

那么,如何继续取消它并正确实施这个 Take with timeout ?

谢谢 :)

4

2 回答 2

6

编写一个对取消安全async友好的生产者/消费者集合并非易事。您需要做的是更改Take以接受 aCancellationToken作为参数,并且它应该注册一个处理程序,以便在取消它时取消它TaskCompletionSource

我强烈推荐你使用BufferBlock<T>,它内置了取消支持。

如果您不能使用 TPL Dataflow(例如,您在 PCL 中工作或有 Dataflow 不支持的目标平台),那么您可以使用我的开源AsyncEx 库中的生产者/消费者集合(例如AsyncProducerConsumerQueueAsyncCollection)。这些都基于AsyncLock和,我在我的博客上AsyncConditionVariable简要描述了一个设计(没有涉及取消细节)。使用这种设计在生产者/消费者集合中支持取消的关键是支持在; 一旦您的条件变量类型支持取消,那么您的集合也将轻松支持它。AsyncConditionVariable.WaitAsync

于 2013-07-23T20:12:22.947 回答
2

我将发布我对如何从 TaskCompletionSource 取消任务的问题的解决方案,因为这是我自己需要的。

我猜这可以用于您的特定需求,但它与特定的超时功能无关,所以这是一个通用解决方案(或者我希望如此)。

这是一种扩展方法:

    public static async Task WaitAsync<T>(this TaskCompletionSource<T> tcs, CancellationToken ctok)
    {

        CancellationTokenSource cts = null;
        CancellationTokenSource linkedCts = null;

        try {

            cts = new CancellationTokenSource();
            linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, ctok);

            var exitTok = linkedCts.Token;

            Func<Task> listenForCancelTaskFnc = async () => {
                await Task.Delay(-1, exitTok).ConfigureAwait(false); 
            };

            var cancelTask = listenForCancelTaskFnc();

            await Task.WhenAny(new Task[] { tcs.Task, cancelTask }).ConfigureAwait(false);

            cts.Cancel();

        } finally {

            if(linkedCts != null) linkedCts.Dispose();

        }

    }

用法:

    async Task TestAsync(CancellationToken ctok) {

        var tcs = new TaskCompletionSource<bool>();

        if (somethingOrTheOther) {
            tcs.TrySetResult(true);
        }

        await tcs.WaitAsync(ctok);

    }

这个想法是让监督异步任务基本上永远等待,直到它被取消,我们可以使用它来“提前退出”以防TaskCompletionSource它尚未满足,但由于取消请求,我们无论如何都需要退出。

监督任务保证在结束时被取消,WaitAsync无论它是如何从WhenAny. 要么对TaskCompletionSource结果感到满意,然后WhenAny完成,短暂地让监督睡眠任务保持完整,直到cts.Cancel()调用下一行,或者它被 取消exitToken,这是传入ctok或内部的组合标记cts.Token

无论如何,我希望这是有道理的——如果这段代码有任何问题,请告诉我......

于 2016-02-25T18:43:39.590 回答