0

我的项目中有很多代码,例如通过像这样使用Reactive 扩展来点击和静音:

    IDisposable dsp = null;
    dsp = TargetObservable.Subscribe((incomingContent) =>
    {
        if (incomingContent == "something")
        {
            myList.Add(incomingContent);
            dsp.Dispose();
        }
    });

首先,我担心线程安全,因为我的 Observable 很忙,一路推送一堆内容,但是后来,我被告知我应该结合ObserveOn(thread)来保证线程安全,我完全同意,所以让我们忘记线程安全的东西。

在这里我想知道:

  1. 我应该如何或何时调用 Dispose 以获取 observable。
  2. 什么是满足Hit 和 mute的正确方法,结合一些完整的扩展方法Take(count),如“TakeWhile(predict)”?
  3. 如果OnComplete()调用,Dispose()将在内部调用,对吗?那么 Observer 和 Observable 之间的引用关系就会中断(因为我的 observable 是一个长寿命的静态实例,引用会导致内存泄漏)。
4

1 回答 1

8

我会避免遵循你在这里的模式。如果其他开发人员必须将全局状态与 subscribe/OnNext 处理程序的内部函数混合使用,则很难理解问题空间。

您最好创建封装序列终止的TakeWhile/TakeUntilIncluding扩展方法。然后,您可以将您的“添加到列表”问题分开。

另一种做法是超级简单:

var subscription = source.Where(x => x=="something")
                         .Take(1)
                         .Subscribe(incomingContent=>myList.Add(incomingContent));
于 2013-09-23T09:04:15.020 回答