0

我有一些类似于

observable.Select(x => Observable.FromAsync(token => Task.Run(() =>
{
    // ... do some work ...

    token.ThrowIfCancellationRequested();

    // ... do some more work ...

    return 7 * x;
}, token)))
.Switch()
.Subscribe(x => 
{
    // Whatever
});

内部发生的工作Task可能需要一段时间,所以如果Switch已经转移到另一个任务(因为到达了一个新值observable),我想中止这项工作。

呼吁ThrowIfCancellationRequested()做正确的事吗?即,一旦IObservable返回 fromFromAsync已被处理(并取消令牌),我OperationCanceledException会被忽略 - 还是会在其他地方引起问题?

我已经在一个测试应用程序中尝试过了,它似乎工作正常 - 但这并不意味着它是正确的:)

4

1 回答 1

1

在引擎盖下,task.ToObservable()最终被调用。 ToObservable为最终执行此操作的任务添加延续:

switch (task.Status)
{
case TaskStatus.RanToCompletion:
    subject.OnNext(Unit.Default);
    subject.OnCompleted();
    return;
case TaskStatus.Canceled:
    subject.OnError(new TaskCanceledException(task));
    return;
case TaskStatus.Faulted:
    subject.OnError(task.Exception.InnerException);
    return;
default:
    return;
}

这意味着 observable 将与TaskCanceledException. 但是在您的代码中,当这种情况发生时,没有观察者观察,AsyncSubject所以什么也没有发生。所以看起来还可以。

于 2013-07-25T15:21:48.747 回答