一对订阅和取消订阅方法将是非组合的。每个操作员都需要保存一个传入订阅的观察者字典,将它们映射到每个观察者实例上,然后传递给相关的可观察序列(传递给操作员)。
例如,考虑为两个源编写合并运算符。今天,这看起来很像这样(textarea 编译):
static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
return Observable.Create<T>(observer =>
{
var n = 2;
var mergeObserver = Observer.Create<T>(
observer.OnNext,
observer.OnError,
() =>
{
// protected by the gate, see use of Synchronize below
if (--n == 0)
observer.OnCompleted();
}
);
var gate = new object();
return new CompositeDisposable(
xs.Synchronize(gate).Subscribe(mergeObserver),
ys.Synchronize(gate).Subscribe(mergeObserver)
);
});
}
如您所见,序列的组合还会导致从 Subscribe 调用返回的 IDisposable 对象的组合。请注意,在 Observable.Create 中发生了很多事情,它会在向给定观察者发送终端消息时自动处理返回的 IDisposable。在这种情况下,对observer.OnError 和observer.OnCompleted 的调用负责在CompositeDisposable 中处理这两个订阅。(但这是一个完全不同的话题,需要一段时间来讨论。)
下面的代码是假设的,假设在 IObservable 上存在订阅/取消订阅对(因此使用具有两个操作的 Create 工厂方法):
static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
var map = new Dictionary<IObserver<T>, IObserver<T>>();
return Observable.Create<T>(
subscribe: observer =>
{
var gate = new object();
var n = 2;
var mergeObserver = Observer.Create<T>(
x =>
{
lock (gate)
observer.OnNext(x);
},
ex =>
{
lock (gate)
observer.OnError(ex);
},
() =>
{
lock (gate)
if (--n == 0)
observer.OnCompleted();
}
);
//
// Using .Synchronize(gate) would be a mess, because then we need to
// keep the two synchronized sequences around as well, such that we
// can call Unsubscribe on those. So, we're "better off" inlining the
// locking code in the observer.
//
// (Or: how composition goes down the drain!)
//
xs.Subscribe(mergeObserver);
ys.Subscribe(mergeObserver);
lock (map)
map[observer] = mergeObserver;
},
unsubscribe: observer =>
{
var mergeObserver = default(IObserver<T>);
lock (map)
map.TryGetValue(observer, out mergeObserver);
if (mergeObserver != null)
{
xs.Unsubscribe(mergeObserver);
ys.Unsubscribe(mergeObserver);
}
}
);
}
注意这是假设的;我什至没有考虑过更多的边缘情况,也没有考虑过这个 Create 将如何工作,以便在调用 OnError 或 OnCompleted 后自行清理。此外,以 Merge 为例,我们很幸运在“取消订阅”期间没有其他资源需要关心(例如调度程序作业)。
希望这可以帮助,
-巴特(Rx 团队)