我将传入连接公开为:
IObservable<double>
每次建立新连接时,我都会通过以下方式通知它:
IObservable<IObservable<double>> m_IncomingConnections;
我的问题是我想建立一个我可以用来做的操作员
IObservable<IList<double>> m_joined = m_IncomingConnections.JoinAll();
所以假设一开始我有三个传入连接,m_joined observable 将推送具有三个元素的列表。
|--a1----a2----------a3-------
|b1---b2----b3--b4---------b5-
|---c1----c2---c3--c4---------
结果:
>[a1,b1,c1]
>[a1,b2,c1]
>[a2,b2,c1]
>[a2,b2,c2]
>[a2,b3,c2]
你明白了,但是如果有新的连接进来,我希望数组在新连接推送新值后立即变为 4 大小。
所有这一切都是为了让我现在可以使用
m_joined.Select(vv => vv.Average());
每次推入新值时流式传输所有流的平均值。
关于为 Rx 实现这样的扩展有什么建议吗?
谢谢您的帮助。