我喜欢 Rx,但我一直遇到一个问题。
假设我们有一个单独的上游IObservable<Foo>
和N
下游序列,每个序列只对满足一些简单谓词(比如foo.bar == someKey
)的那些 Foo 感兴趣。
当然,这对Where()
操作员来说是一项简单的工作:
IObservable<Foo> foos = ...;
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f));
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f));
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f));
...
[many more subscriptions for different bar values]
这里本质上会发生的是,对于每个Foo
产生的上游,Where()
谓词将被评估那个Foo
N
时间。它就像一个线性搜索来查找所有想要这个的订阅者Foo
。这一切都很好,这正是我们(应该)Where()
在这里使用的期望。
我遇到的问题是,就我而言,N
可能非常大,但想要任何特定的订阅者子集Foo
非常小。通常,每个Foo
. 这意味着当我可以进行非常有效的查找以找到也Foo
需要传播的少数下游序列时,我实际上是在进行缓慢的线性搜索。我的应用程序在性能非常关键的环境中运行,我无法承受这种低效率。
我绞尽脑汁试图找到一些更有效的优雅方法,但我只能提出涉及存储大量状态(映射订阅者等)并且必须非常小心地管理并发的解决方案,这打败了很多首先使用 Rx 的目的。就现有运营商而言,我更喜欢某种方式来处理这个问题。有没有人处理过这个问题,或者知道一个好的解决方案?我很乐意提供更多细节。
编辑
我想我的例子有点过于简单了。我不是在处理与某个已知范围内的数值匹配的情况。N 仅用于说明目的。更新了上面的示例。