2

我将传入连接公开为:

IObservable<double>

每次建立新连接时,我都会通过以下方式通知它:

IObservable<IObservable<double>> m_IncomingConnections;

我的问题是我想建立一个我可以用来做的操作员

IObservable<IList<double>> m_joined = m_IncomingConnections.JoinAll();

所以假设一开始我有三个传入连接,m_joined observable 将推送具有三个元素的列表。

|--a1----a2----------a3-------
|b1---b2----b3--b4---------b5-
|---c1----c2---c3--c4---------

结果:

>[a1,b1,c1]
>[a1,b2,c1]
>[a2,b2,c1]
>[a2,b2,c2]
>[a2,b3,c2]

你明白了,但是如果有新的连接进来,我希望数组在新连接推送新值后立即变为 4 大小。

所有这一切都是为了让我现在可以使用

m_joined.Select(vv => vv.Average());

每次推入新值时流式传输所有流的平均值。

关于为 Rx 实现这样的扩展有什么建议吗?

谢谢您的帮助。

4

1 回答 1

1

主要问题是将可观察的列表转换为可观察的列表。

IObservable<List<T>> CombineNLatest<T>(List<IObservable<T>> list){

    return list
        .Aggregate
            ( Observable.Return(new List<T>())
            , (a, n) => Observable.CombineLatest
                ( a
                , b
                , (l, e) => l.Concat(e).ToList()));
}

类似的东西虽然我没有测试过。现在你有这个问题,你需要建立你的列表>

IObservable<List<IObservable<T>> 
CollectList<T>
(this IObservable<IObservable<T>> input)
{
    var initial = new List<IObservable<T>>();
    return Observable.Create(observer => {

        return input.Subscribe( o => {

            initial.Append(o);
            observer.OnNext(initial);

        }

    });
}

再次没有经过测试,但我希望总体思路很清楚。现在将它们链接在一起

IObservable<IObservable<T>> input = ...;
IObservable<IObservable<List<T>>> intermediate = input
   .CollectList()
   .Select( list => CombineNLatest(list) );

IObservable<List<T>> final = intermediate.Switch();
于 2013-04-21T20:09:44.270 回答