5

你知道如何订阅保持 x 时间的条件状态吗?

例如,如果我有BehaviorSubject<int>0 - 100 之间的表示 int 值,并且该值随时间变化,我想在该值连续 10 秒低于 50 时订阅。

如果值暂时回到 50 以上,然后再次下降 50,我想再次计数 10 秒。我怎样才能做到这一点?

非常感谢!

4

3 回答 3

7

这是一个相当简洁的可观察查询,对我来说似乎可以正常工作:

var below50longerthan10seconds =
    subject
        .Select(x => x < 50)
        .DistinctUntilChanged()
        .Select(x =>
            Observable.Delay(
                Observable.Return(x),
                TimeSpan.FromSeconds(10.0)))
        .Switch()
        .Where(x => x)
        .Select(x => Unit.Default);

这是故障。

将值从 0 更改为 100 到true小于 50 时,false否则:

        .Select(x => x < 50)

只保留true&之间的实际变化false

        .DistinctUntilChanged()

将值投影到延迟 10 秒的新 observable 中:

        .Select(x =>
            Observable.Delay(
                Observable.Return(x),
                TimeSpan.FromSeconds(10.0)))

现在执行一个.Switch()- 如果一个新x的在延迟的 observable 之前出现,那么它会被忽略,因为一个新的延迟的 observable 即将到来:

        .Switch()

仅选择true原始流低于 50 时的值:

        .Where(x => x)

选择只是因为没有任何值Unit.Default的值流很奇怪:truefalse

        .Select(x => Unit.Default);

所以现在你有一个IObservable<Unit>当原始流产生的值小于 50 并且在 10 秒内没有产生大于或等于 50 的值时释放一个新值。

那是你想要的吗?

于 2012-08-01T02:07:05.073 回答
4

我希望我不会错过任何东西,但我认为最简单的方法是:

BehaviorSubject<int> value = new BehaviorSubject<int>(0);

value.Select(v => v < 50).DistinctUntilChanged().Throttle(TimeSpan.FromSeconds(10))
.Where(x => x).Subscribe(b => DoSomething());

Enigmativity的答案让我开始如何处理这个问题。谢谢!

于 2012-08-01T09:11:13.293 回答
1

@GideonEngelberth 提出了一个很好的观点,即我的原始解决方案(如下)不起作用。我从@Enigmativity 中窃取了一些想法,但是在他没有实际使用“切换流”从源中检索值的地方,我已经采取了额外的步骤。

结果是这个扩展函数:

public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
    var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them

    var switches = published.Select(pred).DistinctUntilChanged();

    return published.Window(switches.Where(x => x), _ => switches.Where(x => !x))
                    .SelectMany(xs => xs.SkipUntil(Observable.Timer(minTime)).TakeWhile(pred));
}

这使用为我们提供了一个流,当我们低于阈值 ( )Select(pred).DistinctUntilChanged()时触发一次,并且每当我们超过阈值 ( ) 时触发一次。truefalse

true然后我们利用这个流在和之间的可观察对象上创建一个窗口false。我们跳过该新流的前 N ​​秒,并在我们仍然低于阈值时进行。


原文:(不工作)

这个问题分解为:

当流低于 50 时,跳过前 10 秒的值,然后返回流。这样做直到流超过 50。然后重新开始。

RX 的美妙之处在于它可以按原样翻译为相关功能。这是一个简单的扩展方法,可以做到这一点:

public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
    var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them

    var openers = published.Where(pred);            // start taking at this point
    var closers = published.Where(z => !pred(z));   // stop taking at this point

    return openers.SkipUntil(Observable.Timer(minTime))
                  .TakeUntil(closers)
                  .Repeat();
}

这是一个显示它工作的测试:

var ws = Observable.Repeat(1,10);
var xs = Observable.Repeat(2,10);
var ys = Observable.Repeat(100,10);
var zs = ws.Concat(xs.Delay(TimeSpan.FromSeconds(2)).Concat(ys)).Repeat(5);

zs.WhereTimed(z => z <= 50, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

这里的zs流是:

(1 x 10) <2 second pause> (2 x 10) (100 x 10) [repeat 5 times]

根据我们的规则,这应该会导致前 2 秒的暂停触发流,并显示该2值 10 次。100但是,当被击中时应该立即重置,然后在1播放时,没有足够的暂停1来显示任何内容。然后重复几次,只显示数字2

于 2012-07-31T21:35:33.427 回答