3

以以下为例:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();

我在这里想要实现的是在任何给定时间“同步”获取序列中最新项目的值。这意味着像这样的扩展FirstAsync无法弥补我。

StartWithandReplay位确保始终有一个值,并且在我的实际代码中需要该位RefCount来检测我何时可以执行一些处置操作。

因此,为了模拟这个“任何给定时间”部分,让我们尝试在 5 秒后获取最新值:

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.
});

所以有 5 秒的延迟,我需要5从序列中取出值,这些是我迄今为止尝试过的,但没有成功:

  1. ob.First()- 返回 500
  2. ob.Latest().Take(1)- 同上
  3. ob.MostRecent(-1).First()- 同上
  4. ob.MostRecent(-1)- 给了我IEnumerable<long>满满的“500”
  5. ob.Last()- 永远不会返回,因为它正在等待它永远不会完成的序列
  6. ob.Latest().Last()- 同上
  7. ob.ToTask().Result- 同上
  8. ob.ToEnumerable()- 同上
  9. ob.MostRecent().Last()和上面一样

人们实际上可以做到这一点的资源似乎并不多。我能找到的最接近的是:“ Rx: operator for get first and most recent value from an Observable stream ”,但它毕竟不是同步调用(仍然使用订阅),所以它对我不起作用。

有没有人知道这是否真的可行?

4

4 回答 4

3

指出为什么您的代码可能无法按预期工作

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
//Note at this point `ob` has never been subscribed to,
// so the Reference-count is 0 i.e. has not be connected.

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.

    //Here we make our first subscription to the `ob` sequence.
    //  This will connect the sequence (invoke subscribe)
    //   which will
    //      1) invoke StartWith
    //      2) invoke onNext(500)
    //      3) invoke First()
    //      4) First() will then unsubscribe() as it has the single value it needs
    //      5) The refCount will now return to 0
    //      6) The sequence will be unsubscribed to.
    ob.First().Dump();  

    //Any future calls like `ob.First()` will thus always get the value 500.
});

可能你想要的是

var ob = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish(500);
var connection = ob.Connect();
//Note at this point `ob` has never been subscribed to, so the ReferenceCount is 0 i.e. has not be connected.

var subscription = Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.
    ob.First().Dump();
});

//Sometime later
subscription.Dispose();
connection.Dispose()

但是,您真的不想将同步调用与 Rx 混合在一起。您通常也不希望在订阅中订阅(.First()订阅也是如此)。您可能想要做的是获取最新值,并将其存储在某个地方。使用.First()只是一个滑坡。你可能会更好地写一些像

var subscription = Observable.Timer(TimeSpan.FromSeconds(5))
    .SelectMany(_=>ob.Take(1))
    .Subscribe(x =>
    {
        //Do something with X here.
        x.Dump();
    }); 
于 2016-10-17T07:41:51.127 回答
0

只是为了澄清这一点,并感谢@LeeCampbell 的回答。

什么不起作用:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    ob.First().Dump();
    // This gives you 500.
    // Because this is the first time any one subscribes to the observable,
    // so it starts right here and gives you the initial value.
});

什么会真正起作用:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
ob.Subscribe(); // Subscribe to start the above hot observable immediately.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    ob.First().Dump(); 
    // This would give you either 3 or 4, depending on the speed and timing of your computer.
});
于 2016-10-17T23:26:54.760 回答
0

你需要做这样的事情:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500);

var latestAndThenTheRest =
    Observable
        .Create<long>(o =>
        {
            var bs = new BehaviorSubject<long>(1);
            var s1 = ob.Subscribe(bs);
            var s2 = bs.Subscribe(o);
            return new CompositeDisposable(s1, s2);
        });

您在这里唯一需要考虑的是,它ob必须是一个热可观察的,这甚至是有意义的。ob如果天气很冷,那么每个订阅者都会在序列开始时获得全新的订阅。

于 2016-10-17T07:14:28.377 回答
-1

我不确定这个答案是否对您有帮助,但是您是否研究过 BehaviorSubject?这是一个 IObservable,可以记住它的最新值。它有点像一个常规变量和一个可观察变量的组合。

否则,您为什么不自己订阅“ob”并将最新值存储在变量中呢?

于 2016-10-17T06:41:16.807 回答