12

阅读有关响应式扩展等的 msdn 时,我发现一个建议说我不应该实现 IObservable,而应该使用 Observable.Create...当我读到这篇文章时,我的项目已经有了一个ObservableImplementation<T>类,我d 用作 IObservable 源,在任何我想将事件转换为 Observable 的地方。

我已经阅读了AbstractObservable<T>System.Reactive 中的实现,我没有发现他们的代码和我的代码有任何重大区别。那么实现 IObservable 有什么问题呢?我可以向它添加我自己的属性,等等......

为了完整起见,这是我的实现,如果我做错了什么,请告诉我!

public sealed class ObservableImplementation<T> : IObservable<T>
{
    class Subscription : IDisposable
    {
        private readonly Action _onDispose;
        public Subscription(Action onDispose)
        {
            _onDispose = onDispose;
        }

        public void Dispose()
        {
            _onDispose();
        }
    }


    public void Raise(T value)
    {
        _observers.ForEach(o => o.OnNext(value));
    }
    public void Completion()
    {
        _observers.ForEach(o => o.OnCompleted());
        _observers.Clear();
    }

    private readonly List<IObserver<T>> _observers = new List<IObserver<T>>();  
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = new Subscription(() => _observers.Remove(observer));
        _observers.Add(observer);
        return subscription;
    }
    public bool AnyObserverPresent { get { return _observers.Any(); } }
}
4

3 回答 3

22

我们不建议人们直接实现 IObservable<T> 有几个原因。

一是缺乏对违反观察者语法的保护。例如,您的序列可能会在 OnCompleted 调用之后显示 OnNext 调用的行为,这是无效的。Observable.Create<T> 方法和 ObservableBase<T> 基类型通过在接收到终端消息时自动分离观察者来处理这一点。因此,即使您的代码做错了事,观察者也不会看到格式错误的序列。

顺便说一句,这类似于 C# 中的迭代器。手动实现 IEnumerable<T> 应该使得当枚举器的 MoveNext 返回 false(类似于 OnCompleted)时,后续调用不会改变主意并开始返回 true(类似于 OnNext):

如果 MoveNext 超过集合的末尾,则枚举数位于集合中的最后一个元素之后,并且 MoveNext 返回 false。当枚举数位于此位置时,对 MoveNext 的后续调用也将返回 false,直到调用 Reset。(来源:MSDN)

在 C# 2.0 或 VB 11.0 中使用迭代器时,会为您解决这些问题。这类似于我们的 Observable.Create<T> 方法和 ObservableBase<T> 基本类型。

与上述讨论相关的一个原因是清理。从订阅的 Dispose 调用返回后,观察者将不再看到任何消息吗?向观察者发送终端消息后,是否会自动调用相关订阅的 Dispose 逻辑?两者都不是微不足道的,所以我们的基本实现会处理这个问题。

另一个原因与我们的 CurrentThreadScheduler 有关,确保在该调度程序上运行时订阅调用可以是异步的。本质上,我们需要在调用订阅期间检查是否需要在当前线程上安装蹦床。我们不希望每个人都知道这一点并做正确的事。

在您的特定情况下 - 正如其他人在这里所注意到的那样 - 您正在构建一个主题。要么只使用我们的主题之一,要么通过包含在您自己的类型中将其包装(例如,如果您希望发送“观察者”端可以被其他方访问,而不是接收“可观察”端)。

于 2012-05-23T23:19:01.413 回答
10

您不应该实施IObservable<T>的原因与您通常不实施的原因相同IEnumerable<T>,因为有人很可能已经构建了您想要的东西。在这种情况下,您基本上已经重新实现Subject<T>了大部分。

编辑:关于评论中的懒惰问题,我会这样实现:

var lazyObservable = Observable.Create<TFoo>(subj => { /*TODO: Implement Me to load from reflection*/ })
    .Multicast(new Subject<TFoo>())   // This means it'll only calc once
    .RefCount();    // This means it won't get created until someone Subscribes
于 2012-05-07T17:45:53.190 回答
7

Rx 团队最近的一篇博客文章包含三个原因。因为这篇文章很长,所以我复制了相关部分。

执行合同

Observable.Create 接受一个委托,该委托将成为生成的 IObservable 实现上的 Subscribe 方法的核心实现。我们对这个委托做了一些巧妙的包装,以强制执行观察者契约等(这就是为什么你不应该自己实现接口的原因)。

一次性包装纸

返回的一次性有一个小包装器,用于确保在从 Dispose 调用返回后不会再调用观察者,即使调度程序可能还没有处于良好的停止点。(还有一个你永远不应该手动实现 IObservable 接口的原因。哦,顺便说一句,还有更多!)

完成后自动处理

这里的兴趣点是在向下游发送 OnCompleted 时应用于源订阅的自动处置行为。(这也是强烈反对手动实现 IObservable 的另一个原因。在使用 Observable.Create 时,我们会为您解决这个问题。)

于 2012-06-18T23:55:31.470 回答