2

我有一种情况,我需要在由控制 observable 定义的时间对几个 observable 进行采样。

我会直接跳到大理石图。

下面的 co 是控制 observable (Observe.Interval(TimeSpan.FromSeconds(1))

o* 是不同类型的 observables

co tick  ---------x---------x---------x---------x---------x---------x-
o1 int   --2----1-------4--------3--------1-------2----3-------5------
o2 bool  -t-----------f---------t---------------------------f---------

result   ---------Z---------Z---------Z---------Z---------Z---------Z-

我需要开发一种扩展方法,仅在控制通道上有刻度时才对每个 o-observable 的最新值进行采样。

Z 将是从 o-observables 传递最新采样值到选择器函数的结果。它有点像 CombineLatest 但不完全一样。

即,作为一个非常简化的示例,假设 func 看起来像这样:

 (i, b) => {if (b) return i; else return 0; }

我希望这种情况下的结果是

 result   ---------1---------0---------3---------1---------3---------0-

第一个 1 是因为 o2 最后是真的,而 o1 是最后一个 1 第二个 0 是因为 o2 最后是假的。

请注意,o2 并不总是在每个样本之间产生一个值。我仍然需要获取最后一个值。(.Sample() 不起作用。)。实际上,所涉及的函数和类型更复杂,所以不要因为上面的 int 和 bool 类型而做出假设。

另外,我需要选择器功能在每个滴答声中只运行一次。

这是我目前的解决方案,但它不符合上述要求,因为我在 CombineLatest 之后采样。

 var combined = interval.CombineSampled(
            Status,
            H1,
            H2
            M,
            T,
            HMode,
            TMode,
            (i, s, hr1, hr2, m, t, hMode, tMode) =>
            {
               ... (left out)

               return result;
            });

组合采样:

public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
    {
        return controllingSource.Publish(s => s.CombineLatest(source2, source3, source4, source5, source6, source7, source8, selector).SampleEx(s));
    }


public static IObservable<T> SampleEx<T, S>( this IObservable<T> source, IObservable<S> samples )
    {
        // This is different from the Rx version in that source elements will be repeated, and that
        // we complete when either sequence ends. 
        return Observable.Create( ( IObserver<T> obs ) =>
            {
                object gate = new object();
                bool hasSource = false;
                var value = default(T);

                return new CompositeDisposable(
                    source.Synchronize( gate ).Subscribe( v => { value = v; hasSource = true; }, obs ),
                    samples.Synchronize( gate ).Subscribe( _ => { if ( hasSource ) obs.OnNext( value ); }, obs )
                );
            } );
    }
4

1 回答 1

0

把你改成CombineSampled这样:

public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
{
    return controllingSource.Publish(s => s
        .CombineLatest(source2, source3, source4, source5, source6, source7, source8,
           Tuple.Create<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8>)
        .SampleEx(s)
        .Select(t => selector(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6, t.Item7, t.Item8));
}
于 2013-06-12T16:39:52.647 回答