在反应式扩展中,我们有
IObservable<T> Switch(this IObservable<IObservable<T>> This)
我想要一个实施
IObserver<T> Switch(this IObservable<IObserver<T>> This)
这会将传出事件切换到不同的观察者,但呈现为单个观察者。
在反应式扩展中,我们有
IObservable<T> Switch(this IObservable<IObservable<T>> This)
我想要一个实施
IObserver<T> Switch(this IObservable<IObserver<T>> This)
这会将传出事件切换到不同的观察者,但呈现为单个观察者。
这个版本处理了几个问题:
存在可能导致丢失事件的竞争条件。如果观察者在一个线程上观察到一个事件,而源 observable 在另一个线程上产生一个新的观察者,如果你不使用任何类型的同步,你最终可能会OnCompleted
在另一个线程调用之前调用一个线程上的当前OnNext
观察者同一个观察者。这将导致事件丢失。
与上述相关,默认情况下,观察者不是线程安全的。你不应该同时调用观察者,否则你将违反主要的 Rx 合同。如果没有任何锁定,订阅者可能会OnCompleted
在currentObserver
另一个线程调用OnNext
同一个观察者时调用。开箱即用,这种事情可以通过使用同步主题来解决。但是由于前面的问题也需要同步,所以我们可以只使用一个简单的互斥锁。
我们需要一种取消订阅源 observable 的方法。我假设当生成的观察者完成(或出错)时,这是取消订阅源的好时机,因为我们的观察者被告知不要期待更多事件。
这是代码:
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source)
{
var mutex = new object();
var current = Observer.Create<T>(x => {});
var subscription = source.Subscribe(o =>
{
lock (mutex)
{
current.OnCompleted();
current = o;
}
});
return Observer.Create<T>(
onNext: v =>
{
lock(mutex)
{
current.OnNext(v);
}
},
onCompleted: () =>
{
subscription.Dispose();
lock (mutex)
{
current.OnCompleted();
}
},
onError: e =>
{
subscription.Dispose();
lock (mutex)
{
current.OnError(e);
}
});
}
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> This)
{
IObserver<T> currentObserver = Observer.Create<T>(x => { });
This.Subscribe(o => { currentObserver.OnCompleted(); currentObserver = o; });
return Observer.Create<T>
( onNext: v => currentObserver.OnNext(v)
, onCompleted: () => currentObserver.OnCompleted()
, onError: v => currentObserver.OnError(v));
}