你知道如何订阅保持 x 时间的条件状态吗?
例如,如果我有BehaviorSubject<int>
0 - 100 之间的表示 int 值,并且该值随时间变化,我想在该值连续 10 秒低于 50 时订阅。
如果值暂时回到 50 以上,然后再次下降 50,我想再次计数 10 秒。我怎样才能做到这一点?
非常感谢!
你知道如何订阅保持 x 时间的条件状态吗?
例如,如果我有BehaviorSubject<int>
0 - 100 之间的表示 int 值,并且该值随时间变化,我想在该值连续 10 秒低于 50 时订阅。
如果值暂时回到 50 以上,然后再次下降 50,我想再次计数 10 秒。我怎样才能做到这一点?
非常感谢!
这是一个相当简洁的可观察查询,对我来说似乎可以正常工作:
var below50longerthan10seconds =
subject
.Select(x => x < 50)
.DistinctUntilChanged()
.Select(x =>
Observable.Delay(
Observable.Return(x),
TimeSpan.FromSeconds(10.0)))
.Switch()
.Where(x => x)
.Select(x => Unit.Default);
这是故障。
将值从 0 更改为 100 到true
小于 50 时,false
否则:
.Select(x => x < 50)
只保留true
&之间的实际变化false
:
.DistinctUntilChanged()
将值投影到延迟 10 秒的新 observable 中:
.Select(x =>
Observable.Delay(
Observable.Return(x),
TimeSpan.FromSeconds(10.0)))
现在执行一个.Switch()
- 如果一个新x
的在延迟的 observable 之前出现,那么它会被忽略,因为一个新的延迟的 observable 即将到来:
.Switch()
仅选择true
原始流低于 50 时的值:
.Where(x => x)
选择只是因为没有任何值Unit.Default
的值流很奇怪:true
false
.Select(x => Unit.Default);
所以现在你有一个IObservable<Unit>
当原始流产生的值小于 50 并且在 10 秒内没有产生大于或等于 50 的值时释放一个新值。
那是你想要的吗?
我希望我不会错过任何东西,但我认为最简单的方法是:
BehaviorSubject<int> value = new BehaviorSubject<int>(0);
value.Select(v => v < 50).DistinctUntilChanged().Throttle(TimeSpan.FromSeconds(10))
.Where(x => x).Subscribe(b => DoSomething());
Enigmativity的答案让我开始如何处理这个问题。谢谢!
@GideonEngelberth 提出了一个很好的观点,即我的原始解决方案(如下)不起作用。我从@Enigmativity 中窃取了一些想法,但是在他没有实际使用“切换流”从源中检索值的地方,我已经采取了额外的步骤。
结果是这个扩展函数:
public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them
var switches = published.Select(pred).DistinctUntilChanged();
return published.Window(switches.Where(x => x), _ => switches.Where(x => !x))
.SelectMany(xs => xs.SkipUntil(Observable.Timer(minTime)).TakeWhile(pred));
}
这使用为我们提供了一个流,当我们低于阈值 ( )Select(pred).DistinctUntilChanged()
时触发一次,并且每当我们超过阈值 ( ) 时触发一次。true
false
true
然后我们利用这个流在和之间的可观察对象上创建一个窗口false
。我们跳过该新流的前 N 秒,并在我们仍然低于阈值时进行。
原文:(不工作)
这个问题分解为:
当流低于 50 时,跳过前 10 秒的值,然后返回流。这样做直到流超过 50。然后重新开始。
RX 的美妙之处在于它可以按原样翻译为相关功能。这是一个简单的扩展方法,可以做到这一点:
public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them
var openers = published.Where(pred); // start taking at this point
var closers = published.Where(z => !pred(z)); // stop taking at this point
return openers.SkipUntil(Observable.Timer(minTime))
.TakeUntil(closers)
.Repeat();
}
这是一个显示它工作的测试:
var ws = Observable.Repeat(1,10);
var xs = Observable.Repeat(2,10);
var ys = Observable.Repeat(100,10);
var zs = ws.Concat(xs.Delay(TimeSpan.FromSeconds(2)).Concat(ys)).Repeat(5);
zs.WhereTimed(z => z <= 50, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
这里的zs
流是:
(1 x 10) <2 second pause> (2 x 10) (100 x 10) [repeat 5 times]
根据我们的规则,这应该会导致前 2 秒的暂停触发流,并显示该2
值 10 次。100
但是,当被击中时应该立即重置,然后在1
播放时,没有足够的暂停1
来显示任何内容。然后重复几次,只显示数字2
。