0

在反应式扩展中,我们有

IObservable<T> Switch(this IObservable<IObservable<T>> This)

我想要一个实施

IObserver<T> Switch(this IObservable<IObserver<T>> This)

这会将传出事件切换到不同的观察者,但呈现为单个观察者。

4

2 回答 2

3

这个版本处理了几个问题:

  • 存在可能导致丢失事件的竞争条件。如果观察者在一个线程上观察到一个事件,而源 observable 在另一个线程上产生一个新的观察者,如果你不使用任何类型的同步,你最终可能会OnCompleted在另一个线程调用之前调用一个线程上的当前OnNext观察者同一个观察者。这将导致事件丢失。

  • 与上述相关,默认情况下,观察者不是线程安全的。你不应该同时调用观察者,否则你将违反主要的 Rx 合同。如果没有任何锁定,订阅者可能会OnCompletedcurrentObserver另一个线程调用OnNext同一个观察者时调用。开箱即用,这种事情可以通过使用同步主题来解决。但是由于前面的问题也需要同步,所以我们可以只使用一个简单的互斥锁。

  • 我们需要一种取消订阅源 observable 的方法。我假设当生成的观察者完成(或出错)时,这是取消订阅源的好时机,因为我们的观察者被告知不要期待更多事件。

这是代码:

public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source)
{
    var mutex = new object();
    var current = Observer.Create<T>(x => {});
    var subscription = source.Subscribe(o =>
    {
        lock (mutex)
        {
           current.OnCompleted();
           current = o;
        }
    });

    return Observer.Create<T>(
        onNext: v =>
        {
            lock(mutex)
            {                
              current.OnNext(v);
            }
        },
        onCompleted: () =>
        {
             subscription.Dispose();
             lock (mutex)
             {
                 current.OnCompleted();
             }
        },
        onError: e =>
        {
             subscription.Dispose();
             lock (mutex)
             {
                 current.OnError(e);
             }
        });
}
于 2013-04-15T19:46:08.090 回答
1
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> This)
{
    IObserver<T> currentObserver = Observer.Create<T>(x => { });

    This.Subscribe(o => { currentObserver.OnCompleted(); currentObserver = o; });


    return Observer.Create<T>
        ( onNext: v => currentObserver.OnNext(v)
        , onCompleted: () => currentObserver.OnCompleted()
        , onError: v => currentObserver.OnError(v));
}
于 2013-04-15T06:18:38.407 回答