我阅读了 Reactive Extension 的代码,然后我找到了这段代码
private readonly IObservable<TSource> _source;
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
我发现方法SubscribeSafe(sink)
在类ObservableExtensions
中,而类没有实现接口IObservable
。为什么参数_source有成员函数 SubscribeSafe
()?