我从Ollie Riches 的博客文章“尝试使用 Rx 实现更多功能”中阅读了以下内容,并与作者一样想知道:为什么OnCompleted没有通过?有人可以告诉这里发生了什么吗?也许是一些简单得令人尴尬的事情?
为方便起见,代码在此处进行了一些修改和复制(如果在这里撕掉他的代码是不可接受的,我向 Ollie 道歉):
public static class RxExtensions
{
public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool initialState = false)
{
return Observable.Create<T>(o =>
{
var disposable = suspend.StartWith(initialState)
.DistinctUntilChanged()
.Select(s => s ? Observable.Empty<T>() : stream)
.Switch()
.Subscribe(o);
return disposable;
});
}
}
var testScheduler = new TestScheduler();
var generatorCount = 10;
//If the limit will be hardcoded to something less than generatorCount, an exception will be
//thrown and the exception object will be set. Why it doesn't happen to completed in the following?
var generator = Observable.Generate(1,
x => x <= generatorCount,
x => x + 1,
x => { if(x != 11) { Console.WriteLine(x); return x; } else { throw new ArgumentException(); } },
x => TimeSpan.FromSeconds(1),
testScheduler);
Exception exception = null;
var completed = false;
generator.Suspendable(new Subject<bool>()).Subscribe(_ => { }, e => exception = e, () => completed = true);
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1001000).Ticks);
Console.WriteLine(exception);
Console.WriteLine(completed);
为了记录,我正在考虑尝试生成一个可以暂停和停止的流,区别在于暂停流累积事件,暂停只是跳过它们。它开始看起来比我预期的要复杂一些,特别是如果有人想对暂停的位设置限制或“保存策略”。那好吧...
<edit:有趣的是,我刚刚注意到 Pausable 的RxJS 实现。