我有一种情况,我需要在由控制 observable 定义的时间对几个 observable 进行采样。
我会直接跳到大理石图。
下面的 co 是控制 observable (Observe.Interval(TimeSpan.FromSeconds(1))
o* 是不同类型的 observables
co tick ---------x---------x---------x---------x---------x---------x-
o1 int --2----1-------4--------3--------1-------2----3-------5------
o2 bool -t-----------f---------t---------------------------f---------
result ---------Z---------Z---------Z---------Z---------Z---------Z-
我需要开发一种扩展方法,仅在控制通道上有刻度时才对每个 o-observable 的最新值进行采样。
Z 将是从 o-observables 传递最新采样值到选择器函数的结果。它有点像 CombineLatest 但不完全一样。
即,作为一个非常简化的示例,假设 func 看起来像这样:
(i, b) => {if (b) return i; else return 0; }
我希望这种情况下的结果是
result ---------1---------0---------3---------1---------3---------0-
第一个 1 是因为 o2 最后是真的,而 o1 是最后一个 1 第二个 0 是因为 o2 最后是假的。
请注意,o2 并不总是在每个样本之间产生一个值。我仍然需要获取最后一个值。(.Sample() 不起作用。)。实际上,所涉及的函数和类型更复杂,所以不要因为上面的 int 和 bool 类型而做出假设。
另外,我需要选择器功能在每个滴答声中只运行一次。
这是我目前的解决方案,但它不符合上述要求,因为我在 CombineLatest 之后采样。
var combined = interval.CombineSampled(
Status,
H1,
H2
M,
T,
HMode,
TMode,
(i, s, hr1, hr2, m, t, hMode, tMode) =>
{
... (left out)
return result;
});
组合采样:
public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
{
return controllingSource.Publish(s => s.CombineLatest(source2, source3, source4, source5, source6, source7, source8, selector).SampleEx(s));
}
public static IObservable<T> SampleEx<T, S>( this IObservable<T> source, IObservable<S> samples )
{
// This is different from the Rx version in that source elements will be repeated, and that
// we complete when either sequence ends.
return Observable.Create( ( IObserver<T> obs ) =>
{
object gate = new object();
bool hasSource = false;
var value = default(T);
return new CompositeDisposable(
source.Synchronize( gate ).Subscribe( v => { value = v; hasSource = true; }, obs ),
samples.Synchronize( gate ).Subscribe( _ => { if ( hasSource ) obs.OnNext( value ); }, obs )
);
} );
}