15

我正在阅读 Head First Design Patterns 这本书,并尽我所能将代码从他们的 Java 转换为 C#。在本书讨论了观察者模式之后,它提到 Java 内置了类/接口,.NET4 也是如此。所以我开始研究如何正确使用它,除了方法之外,我已经弄清楚了大部分Subscribe()

如果您在尝试订阅 IObserver 时查看MSDN 文章,该方法会返回 IDisposable。为什么有必要这样做?为什么不直接实现一个基于方法参数取消订阅 IObserver 的方法呢?我研究了使用 IDisposable 接口的原因。我也读了这个,但不太明白区别/它试图告诉我什么:

它返回对 IDisposable 接口的引用。这使观察者能够在提供者完成发送通知并调用订阅者的 OnCompleted 方法之前取消订阅(即停止接收通知)。

4

6 回答 6

8

取消订阅所需的信息将根据事件发布者管理订阅的方式而有所不同。用于事件的方法——将Remove委托传递给之前传递给方法的Add方法——有点可行,但有一些重大缺陷。其中:

  1. 它通常需要事件发布者执行线性搜索来定位包含与订阅相关的信息的记录。如果一个事件有可能有很多订阅者,这可能会不必要地创建 O(N^2) 行为。如果订阅者保存在某种链表中(链接对象或索引链接数组槽),并且取消订阅请求包含有关要取消的订阅的信息,则订阅和取消订阅都可以在恒定时间内处理。此外,取消订阅可以以无锁非阻塞方式安全轻松地处理(在很可能是无竞争的数组插槽上使用一个 `CompareExchange`),可以在 `Finalize` 上下文中安全地完成。
  2. 如果一个委托多次订阅处理顺序很重要的事件,并且代码尝试取消第一个订阅,则最后一个订阅将被取消,而第一个订阅将保持有效。
  3. 如果订阅了委托“D”,则订阅了包含“A”、“B”、“C”和“D”的多播委托“ABCD”,然后“D”被取消订阅,则代表“DABC”将保留按顺序订阅,即使代码尝试取消订阅“ABCD”。请注意,如果使用“List”而不是“delegateType.Combine”,则可以避免此问题,但其他问题仍然存在。

让事件订阅方法返回一个可用于取消订阅的对象可以避免这些问题。那么最大的问题就是应该使用什么类型的对象。想到三个选择:

  1. 一个委托(可能是无参数的,返回 `void`)
  2. 一些`ICancelSubscription`接口,用单一方法(可能无参数,返回`void`)取消订阅
  3. `IDisposable`,一个存在的接口,有一个单一的无参数方法,被广泛用于清理相关的目的

使用委托将是一个合理的选择。它可以很容易地封装取消订阅所需的任何信息,而用户不必担心该信息可能采用什么形式。使用委托将需要分配至少一个额外的堆对象(用于委托本身)和可能的两个(第二个是保存取消订阅信息的对象)。使用IDisposable与使用委托基本相同,除了调用Dispose而不是Invoke; 但是,在许多情况下,IDisposable在效率方面会略有优势。使用其他一些接口也是可行的,但与使用现有的IDisposable.

于 2012-05-16T20:38:27.513 回答
4

一对订阅和取消订阅方法将是非组合的。每个操作员都需要保存一个传入订阅的观察者字典,将它们映射到每个观察者实例上,然后传递给相关的可观察序列(传递给操作员)。

例如,考虑为两个源编写合并运算符。今天,这看起来很像这样(textarea 编译):

static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
    return Observable.Create<T>(observer =>
    {
        var n = 2;

        var mergeObserver = Observer.Create<T>(
            observer.OnNext,
            observer.OnError,
            () =>
            {
                // protected by the gate, see use of Synchronize below
                if (--n == 0)
                    observer.OnCompleted();
            }
        );

        var gate = new object();

        return new CompositeDisposable(
            xs.Synchronize(gate).Subscribe(mergeObserver),
            ys.Synchronize(gate).Subscribe(mergeObserver)
        );
    });
}

如您所见,序列的组合还会导致从 Subscribe 调用返回的 IDisposable 对象的组合。请注意,在 Observable.Create 中发生了很多事情,它会在向给定观察者发送终端消息时自动处理返回的 IDisposable。在这种情况下,对observer.OnError 和observer.OnCompleted 的调用负责在CompositeDisposable 中处理这两个订阅。(但这是一个完全不同的话题,需要一段时间来讨论。)

下面的代码是假设的,假设在 IObservable 上存在订阅/取消订阅对(因此使用具有两个操作的 Create 工厂方法):

static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
    var map = new Dictionary<IObserver<T>, IObserver<T>>();

    return Observable.Create<T>(
        subscribe: observer =>
        {
            var gate = new object();
            var n = 2;

            var mergeObserver = Observer.Create<T>(
                x =>
                {
                    lock (gate)
                        observer.OnNext(x);
                },
                ex =>
                {
                    lock (gate)
                        observer.OnError(ex);
                },
                () =>
                {
                    lock (gate)
                        if (--n == 0)
                            observer.OnCompleted();
                }
            );

            //
            // Using .Synchronize(gate) would be a mess, because then we need to
            // keep the  two synchronized sequences around as well, such that we
            // can call Unsubscribe on those. So, we're "better off" inlining the
            // locking code in the observer.
            //
            // (Or: how composition goes down the drain!)
            //
            xs.Subscribe(mergeObserver);
            ys.Subscribe(mergeObserver);

            lock (map)
                map[observer] = mergeObserver;
        },
        unsubscribe: observer =>
        {
            var mergeObserver = default(IObserver<T>);
            lock (map)
                map.TryGetValue(observer, out mergeObserver);

            if (mergeObserver != null)
            {
                xs.Unsubscribe(mergeObserver);
                ys.Unsubscribe(mergeObserver);
            }
        }
    );
}

注意这是假设的;我什至没有考虑过更多的边缘情况,也没有考虑过这个 Create 将如何工作,以便在调用 OnError 或 OnCompleted 后自行清理。此外,以 Merge 为例,我们很幸运在“取消订阅”期间没有其他资源需要关心(例如调度程序作业)。

希望这可以帮助,

-巴特(Rx 团队)

于 2012-05-23T22:55:37.913 回答
4

您的主要问题似乎是微软为什么选择:

interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

代替

interface IObservable<T>
{
    void Subscribe(IObserver<T> observer);
    void Unsubscribe(IObserver<T> observer);
}

虽然我不知道设计会议或其中的推理,但我可以推测做出这个选择的一些原因。

我能想到的选择第一种格式的最佳理由是,它允许您仅跟踪从订阅返回的 IDisposable,而不是同时跟踪可观察对象和观察者。例如,假设您有这样的代码:

var obsSource = /* some observable source */
var subscription = obsSource.Where(x => x != null).Subscribe(Console.WriteLine);
// stuff...
subscription.Dispose();

在这种情况下,我不需要持有对订阅的 observable 的引用(从 中返回Where),也不需要显式创建观察者(通过Subscribe扩展方法)。如果选择了第二个选项,您将不得不这样做:

var obsSource = /* some observable source */
var filtered = obsSource.Where(x => x != null);
var observer = Observer.Create(Console.WriteLine);
filtered.Subscribe(observer);
// stuff...
filtered.Unsubscribe(observer);

从表面上看,这看起来并没有太大的不同,但正如我在另一个问题中讨论的那样,一旦订阅,就不需要保留中间可观察对象。如果采用第二种方法,您仍然需要为链中的每个步骤创建并保留观察者,并且还必须保留对中间可观察对象的引用。

版本 1 使对偶性IEnumerable比方法 2 更容易看到。这可能是初始设计的一部分,但对于长期而言,它肯定不是一个不那么重要的原因。

于 2012-05-16T23:11:43.557 回答
1

调用取消订阅方法意味着您需要传入订阅的同一个实例。你必须保留一个参考。如果您有许多不同类型的可观察对象,则需要保留许多不同的引用。

取消订阅方法会使清理代码变得混乱。

但是,IDisposable您只需要一个结构来保存所有引用 - List<IDisposable>(或者您可以使用 Rx's CompositeDisposable)。

现在您的清理代码可以非常整洁。

我更进一步,IDisposable为所有清理代码创建了实例——不仅仅是来自 Rx 订阅。它使生活变得非常轻松。

于 2012-05-17T00:30:17.730 回答
1

除了给出的其他原因之外,IDisposable还允许您使用using块。

using (var subscription = source.Subscribe(observer))
{
    // ...
}
于 2012-05-17T13:20:07.970 回答
0

当观察者本身被 Disposed 时,它可以很容易地确保终止对观察者的订阅。考虑订阅通常由实现 IObservable 的类调用:

class MyObserver<Foo> : IObserver<Foo>, IDisposable {

    private IDisposable _subscription;

    public MyObserver(IObservable<T> eventSource) {
        _subscription = eventSource.Subscribe(this);
    }

    // implementation of IObservable members here

    public void Dispose() {
        _subscription.Dispose();
    }
}

现在,当我的 MyObserver 实例被释放时,订阅将随之自动释放:

public partial class MyGUIThing : Form {
    private MyObservable<Foo> _observer = new MyObservable<Foo>(someEventSource);
    // whatever else
}

创建此表单的实例时,将启动订阅。当表单关闭并_observer完成时,订阅也将自动处理。

于 2012-05-16T20:38:54.307 回答