0

我从Ollie Riches 的博客文章“尝试使用 Rx 实现更多功能”中阅读了以下内容,并与作者一样想知道:为什么OnCompleted没有通过?有人可以告诉这里发生了什么吗?也许是一些简单得令人尴尬的事情?

为方便起见,代码在此处进行了一些修改和复制(如果在这里撕掉他的代码是不可接受的,我向 Ollie 道歉):

public static class RxExtensions
{
    public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool initialState = false)
    {
        return Observable.Create<T>(o =>
        {
            var disposable = suspend.StartWith(initialState)
                    .DistinctUntilChanged()
                    .Select(s => s ? Observable.Empty<T>() : stream)
                    .Switch()
                    .Subscribe(o);

            return disposable;
        });
    }
}

var testScheduler = new TestScheduler();
var generatorCount = 10;

//If the limit will be hardcoded to something less than generatorCount, an exception will be
//thrown and the exception object will be set. Why it doesn't happen to completed in the following?
var generator = Observable.Generate(1,
    x => x <= generatorCount,
    x => x + 1,
    x => { if(x != 11) { Console.WriteLine(x); return x; } else { throw new ArgumentException(); } },
    x => TimeSpan.FromSeconds(1),
    testScheduler);


Exception exception = null;
var completed = false;
generator.Suspendable(new Subject<bool>()).Subscribe(_ => { }, e => exception = e, () => completed = true);   
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1001000).Ticks);

Console.WriteLine(exception);
Console.WriteLine(completed);

为了记录,我正在考虑尝试生成一个可以暂停和停止的流,区别在于暂停流累积事件,暂停只是跳过它们。它开始看起来比我预期的要复杂一些,特别是如果有人想对暂停的位设置限制或“保存策略”。那好吧...

<edit:有趣的是,我刚刚注意到 Pausable 的RxJS 实现

4

2 回答 2

1

您的观察者订阅了suspend流和source流。在两个流都完成之前,该组合流不会完成。基本上您的source流已完成,但Suspendable正在等待查看是否会有更多暂停/取消暂停信号通过。如果他们这样做,它将重新订阅源流。

在源流完成时让可暂停的流完成是可能的,但可能会破坏您的方法的目的。基本上必须保持订阅源流并在源完成时结束暂停的流。你可以这样做:

var shared = stream.Publish();
var pausable = suspend
    .StartWith(initialState)
    .TakeUntil(shared.LastOrDefaultAsync())
    .DistinctUntilChanged()
    .Select(p => p ? shared : Observable.Empty<T>())
    .Switch();
var disposable = new CompositeDisposable(pausable.Subscribe(o), shared.Connect());
return disposable;
于 2014-10-24T20:57:40.643 回答
0

Completed 未发送,因为您的订阅在 Observable.Empty() 上,而不是您的 _generator 的后代

所以 ai 给你一个更好的答案 CombineLatest

public static IObservable<T> Suspendable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser,
    bool initialState = false)
{
    return 
        source.CombineLatest(pauser.StartWith(initialState), 
                             (value, paused) => new {value, paused})
              .Where(_=>!_.paused)
              .Select(_=>_.value);
}
于 2016-10-09T07:54:58.927 回答