1

让我们考虑一下方法:

Task Foo(IEnumerable items, CancellationToken token)
{
    return Task.Run(() =>
    {
        foreach (var i in items)
            token.ThrowIfCancellationRequested();

    }, token);
}

然后我有一个消费者:

var cts = new CancellationTokenSource();
var task = Foo(Items, cts.token);
task.Wait();

和项目的例子:

IEnumerable Items
{
    get
    {
        yield return 0;
        Task.Delay(Timeout.InfiniteTimeSpan).Wait();
        yield return 1;
    }
}

任务呢。等等? 我无法将取消令牌放入项目集合中

如何杀死没有响应的任务或解决这个问题?

4

7 回答 7

3

我找到了一种允许将取消令牌放入来自第三方的项目的解决方案:

public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token)
{
    var enumerator = @this.GetEnumerator();

    for (; ; )
    {
        var task = Task.Run(() => enumerator.MoveNext(), token);
        task.Wait(token);

        if (!task.Result)
            yield break;

        yield return enumerator.Current;
    }
}

现在我需要使用:

Items.ToCancellable(cts.token)

并且在取消请求后不会挂起。

于 2013-08-07T13:42:17.187 回答
2

您不能真正取消不可取消的操作。Stephen Toub 在 Parallel FX Team 的博客上的“我如何取消不可取消的异步操作? ”中进行了详细介绍,但本质是您需要了解您真正想要做什么?

  1. 停止异步/长时间运行的操作本身?如果您无法发出操作信号,则无法以合作方式进行
  2. 停止等待操作完成,忽略任何结果?这是可行的,但由于显而易见的原因可能导致不可靠性。您可以使用传递取消令牌的长操作启动任务,或使用斯蒂芬图布描述的 TaskCompletionSource。

您需要确定要找到正确解决方案的行为

于 2013-08-07T14:35:44.467 回答
1

为什么不能将 CancellationToken 传递给Items()

IEnumerable Items(CancellationToken ct)
{
    yield return 0;
    Task.Delay(Timeout.InfiniteTimeSpan, ct).Wait();
    yield return 1;
}

Items()当然,您必须传递与传递给相同的令牌Foo()

于 2013-08-07T12:32:55.367 回答
1

尝试使用 aTaskCompletionSource并返回它。然后,如果内部任务运行完成(或错误),您可以将其设置TaskCompletionSource为结果(或错误)。CancellationToken但是如果被触发,您可以将其设置为立即取消。

Task<int> Foo(IEnumerable<int> items, CancellationToken token)
{
    var tcs = new TaskCompletionSource<int>();
    token.Register(() => tcs.TrySetCanceled());
    var innerTask = Task.Factory.StartNew(() =>
    {
        foreach (var i in items)
            token.ThrowIfCancellationRequested();
        return 7;
    }, token);
    innerTask.ContinueWith(task => tcs.TrySetResult(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
    innerTask.ContinueWith(task => tcs.TrySetException(task.Exception), TaskContinuationOptions.OnlyOnFaulted);
    return tcs.Task;
}

这实际上不会杀死内部任务,但它会给你一个任务,你可以在取消后立即继续。要杀死内部任务,因为它在无限超时中挂起,我相信您唯一能做的就是获取对Thread.CurrentThread您开始任务的位置的引用,然后taskThread.Abort()从内部调用Foo,这当然是不好的做法。但是在这种情况下,您的问题实际上归结为“如何在无法访问代码的情况下终止长时间运行的函数”,这只能通过Thread.Abort.

于 2013-08-07T12:55:10.690 回答
1

你可以用 ItemsIEnumerable<Task<int>>代替IEnumerable<int>吗?然后你可以做

return Task.Run(() =>
{
    foreach (var task in tasks)
    {
        task.Wait(token);
        token.ThrowIfCancellationRequested();
        var i = task.Result;
    }
}, token);

尽管使用 Reactive Framework 和做items.ToObservable. 看起来像这样:

static Task<int> Foo(IEnumerable<int> items, CancellationToken token)
{
    var sum = 0;
    var tcs = new TaskCompletionSource<int>();
    var obs = items.ToObservable(ThreadPoolScheduler.Instance);
    token.Register(() => tcs.TrySetCanceled());
    obs.Subscribe(i => sum += i, tcs.SetException, () => tcs.TrySetResult(sum), token);
    return tcs.Task;
}
于 2013-08-07T13:15:14.557 回答
0

如何围绕可在项目之间取消的可枚举创建一个包装器?

IEnumerable<T> CancellableEnum<T>(IEnumerable<T> items, CancellationToken ct) {
    foreach (var item in items) {
        ct.ThrowIfCancellationRequested();
        yield return item;
    }
}

...尽管这似乎是 Foo() 已经做的事情。如果你有一些地方这个可枚举的块实际上是无限的(而且它不仅仅是很慢),那么你要做的就是在消费者端的 task.Wait() 中添加超时和/或取消令牌。

于 2013-08-07T12:42:47.753 回答
0

之前的解决方案是基于一个乐观的假设,即枚举可能不会挂起并且速度非常快。因此我们有时可以牺牲系统线程池中的一个线程?正如Dax Fohl所指出的,即使其父任务已被取消异常杀死,该任务仍将处于活动状态。在这方面,如果多个集合被无限期冻结,这可能会阻塞默认任务调度程序使用的底层 ThreadPool。

因此,我重构了 ToCancellable 方法:

public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token)
{
    var enumerator = @this.GetEnumerator();
    var state = new State();

    for (; ; )
    {
        token.ThrowIfCancellationRequested();

        var thread = new Thread(s => { ((State)s).Result = enumerator.MoveNext(); }) { IsBackground = true, Priority = ThreadPriority.Lowest };
        thread.Start(state);

        try
        {
            while (!thread.Join(10))
                token.ThrowIfCancellationRequested();
        }
        catch (OperationCanceledException)
        {
            thread.Abort();
            throw;
        }

        if (!state.Result)
            yield break;

        yield return enumerator.Current;
    }
}

还有一个帮助类来管理结果:

class State
{
    public bool Result { get; set; }
}

中止分离的线程是安全的。

我在这里看到的痛苦是一个沉重的线程创建。这可以通过使用自定义线程池以及能够处理中止异常的生产者-消费者模式来解决,以便从池中删除损坏的线程。

另一个问题是在加入线。这里最好的停顿是什么?也许这应该由用户负责并作为方法参数发送。

于 2013-08-08T09:15:58.870 回答