这里有两个方便的运算符用于取消可观察序列。它们之间的区别在于取消时会发生什么。TakeUntil
导致序列正常完成 ( OnCompleted
),而导致WithCancellation
异常终止 ( OnError
)。
/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(observer =>
cancellationToken.Register(() => observer.OnNext(default))));
}
/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
使用示例:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var pulses = Observable
.Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
.WithCancellation(cts.Token);
注意:在取消的情况下,上面介绍的自定义操作符会立即从底层 observable 取消订阅。如果 observable 包含副作用,则需要考虑这一点。将执行副作用的操作符放在TakeUntil(cts.Token)
前面将推迟整个 observable 的完成,直到副作用完成(优雅终止)。将其放在副作用之后将使取消立即生效,从而可能导致任何正在运行的代码以一种即发即弃的方式继续运行而不被观察。