3

我需要在两个状态之间交替,每个状态都有不同的间隔时间。
我能想到的最好方法是使用 Reactive Extensions 的 Observable.Generate,这非常棒。

根据我在 msdn 和其他网站上阅读的内容,如果可观察对象“正常或异常终止”,则 Observable.Finally() 应该触发。我正在测试以下代码(在 LINQPad 中)以查看它是如何工作的,但我根本无法让 .Final() 触发。

var ia = TimeSpan.FromSeconds(1);
var ib = TimeSpan.FromSeconds(.2);
var start = DateTime.UtcNow;
var ct = new CancellationTokenSource();

var o = Observable.Generate(
    true,
//    s => !ct.IsCancellationRequested,
    s => (DateTime.UtcNow-start) < TimeSpan.FromSeconds(3) && !ct.IsCancellationRequested,
    s => !s,
    s => s ? "on" : "off",
    s => s? ib : ia)
//    .TakeUntil(start+TimeSpan.FromSeconds(3))
    .Concat(Observable.Return("end"));


o.Subscribe( s=> s.Dump(), ct.Token);
var t = o.ToTask(ct.Token);


t.ContinueWith(x => x.Dump("done"));
o.Finally(() => "finallY".Dump()); // never gets called?

Thread.Sleep(10000);
ct.Cancel();

如果我将 Thread.Sleep 设为 10 秒,则可观察序列完成并且 Task.ContinueWith 会触发,但不会触发 .Finally()。

如果我让 Thread.Sleep 2s,可观察序列被取消,Task.ContinueWith 再次触发,但不是 .Finally()。

为什么不?

4

1 回答 1

6

查看Finally方法的返回类型;应该给你一个提示。就像该Concat方法返回一个带有 IObservable序列的新序列,但不更改原始序列一样,该Finally方法返回一个IObservable具有最终操作的新序列,但您订阅的是原始序列IObservable。将以下行放在您的Subscribe电话前面,它会起作用。

o = o.Finally(() => "finallY".Dump());

我同意这是一个奇怪的 API 选择;我认为Finally更像是而Subscribe不是Concat。您正在订阅最终的“事件”;奇怪的是,API强制您创建一个全新的IObservable,然后订阅它只是为了让Finally事情发生。另外,它允许潜在的错误(如果我们在您的问题中使用该函数,则很明显)如果您订阅两次新的IObservable,您的Finally函数将执行两次。因此,您必须确保您的订阅之一在“最终” IObservable上,而其他订阅都在原始订阅上。只是看起来不寻常。

我想考虑它的方式Finally不是要修改 observable,而是要修改订阅本身。即,他们不希望您通常制作具有Finally事物(var o = Observable.[...].Finally(...);)的可公开访问的命名可观察对象,而是要与订阅调用本身内联(var subscription = o.Finally(...).Subscribe(...);

于 2013-07-10T19:38:03.547 回答