1

我喜欢 Rx,但我一直遇到一个问题。

假设我们有一个单独的上游IObservable<Foo>N下游序列,每个序列只对满足一些简单谓词(比如foo.bar == someKey)的那些 Foo 感兴趣。

当然,这对Where()操作员来说是一项简单的工作:

IObservable<Foo> foos = ...;
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f));
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f));
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f));
...
[many more subscriptions for different bar values]

这里本质上会发生的是,对于每个Foo产生的上游,Where()谓词将被评估那个Foo N时间。它就像一个线性搜索来查找所有想要这个的订阅者Foo。这一切都很好,这正是我们(应该)Where()在这里使用的期望。

我遇到的问题是,就我而言,N可能非常大,但想要任何特定的订阅者子集Foo非常小。通常,每个Foo. 这意味着当我可以进行非常有效的查找以找到也Foo需要传播的少数下游序列时,我实际上是在进行缓慢的线性搜索。我的应用程序在性能非常关键的环境中运行,我无法承受这种低效率。

我绞尽脑汁试图找到一些更有效的优雅方法,但我只能提出涉及存储大量状态(映射订阅者等)并且必须非常小心地管理并发的解决方案,这打败了很多首先使用 Rx 的目的。就现有运营商而言,我更喜欢某种方式来处理这个问题。有没有人处理过这个问题,或者知道一个好的解决方案?我很乐意提供更多细节。

编辑

我想我的例子有点过于简单了。我不是在处理与某个已知范围内的数值匹配的情况。N 仅用于说明目的。更新了上面的示例。

4

2 回答 2

3

在 Codeplex 的 Rx 讨论板上从 Dave Sexton 那里得到了一个很好的解决方案:

https://rx.codeplex.com/discussions/439717

将GroupByGroupByUntilPublish一起使用怎么样?

例如:(未测试

IConnectableObservable<IGroupedObservable<string, Foo>> foosByBar = 
    (from foo in foos
     group foo by foo.bar)
    .Publish();

foosByBar.Where(g => g.Key == "abc").Take(1).SelectMany(g => g).Subscribe(A);
foosByBar.Where(g => g.Key == "xyz").Take(1).SelectMany(g => g).Subscribe(B);
foosByBar.Where(g => g.Key == "bla").Take(1).SelectMany(g => g).Subscribe(C);

foosByBar.Connect();

GroupBy对每个键使用字典查找来找到推送值的适当可观察对象。

Publish广播 group-by,以便所有观察者共享字典查找操作。

Where / Take只执行一次谓词以定位适当的组,然后它接收该组中每个值的广播以及对同一键感兴趣的任何其他观察者。

请注意,GroupBy不会重播IGroupedObservable,因此您必须在连接之前设置所有订阅。如果您宁愿使用RefCount而不是Connect,那么也许您应该考虑将Replay运算符应用于GroupBy的结果 。

于 2013-04-10T15:44:21.853 回答
0

有些东西正在存储状态,现在它只是存储您通过 Wheres 添加的所有订阅者的可观察对象。不清楚你是否意识到这一点,但 foos 必须在每条消息上通知它的每个观察者。Where所做的只是让大多数观察者简单地检查谓词并返回,但每个谓词都会检查每条消息。

构建一个包含为观察者的处理程序地图不会太困难,并且应该为您带来所需的性能提升。只需根据需要注册尽可能多的处理程序,然后将地图订阅到源 observable。如果 aDictionary不提供您需要的匹配语义,您将不得不提出一些其他方案来减少查找,但总体思路是相同的。请注意,如果它有多个应处理的输入,您可以多次注册同一个处理程序,并且您可以为同一输入注册多个处理程序。

class ObserverMap<T> : IObserver<T>
{
    ObserverMap(Action<Exception> onError, Action onCompleted)
    {
        _onError = onError;
        _onCompleted = onCompleted;
        _handlers = new Dictionary<T, List<Action<T>>>();
    }
    ObserverMap(Action<Exception> onError, Action onCompleted, IEqualityComparer<T> comparer) 
    {
        _onError = onError;
        _onCompleted = onCompleted;
        _handlers = new Dictionary<T, List<Action<T>>>(comparer);
    }

    int _stopped;
    Dictionary<T, List<Action<T>>> _handlers;
    Action<Exception> _onError;
    Action _onCompleted;

    public void OnCompleted()
    {
        if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0)
        {
            if (_onCompleted != null) _onCompleted();
        }
    }

    public void OnError(Exception error)
    {
        if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0)
        {
            if (_onCompleted != null) _onCompleted();
        }
    }

    public void OnNext(T value)
    {
        if (_stopped != 0) return;

        List<Action<T>> match;
        if (_handlers.TryGetValue(value, out match))
        {
            foreach (var handler in match)
            {
                handler(value);
            }
        }
    }

    public IDisposable RegisterHandler(T key, Action<T> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");

        List<Action<T>> match;
        if (!_handlers.TryGetValue(key, out match))
        {
            match = new List<Action<T>>();
            _handlers.Add(key, match);
        }
        match.Add(handler);

        return System.Reactive.Disposables.Disposable.Create(() => match.Remove(handler));
    }
}
于 2013-04-09T22:52:06.883 回答