0

我订阅了一个推送频率很高的 Observable,这些内容来自网络 I/O,所以每次推送最初来自不同的线程,然后我有一些观察者可能会尝试获取一些内容然后快速取消订阅以确保有没有其他内容传入,因此代码示例如下:

        IDisposable dsp = null;
        dsp = TargetObservable.Subscribe((incomingContent) =>
        {
            if (incomingContent == "something")
            {
                myList.Add(incomingContent);
                dsp.Dispose();
            }
            else
            {
                otherList.Add(incomingContent);
            }
        });

目前, OnNext 显然不是线程安全的,这意味着当观察者获得“某物”并且就在调用 Dispose() 之前,其他内容可能仍会传入并添加到 'otherList',即使我放了一个 'lock(.. .)' 代表整个 'onNext(...)'。
这不是我们想要的,所以有什么想法可以避免这种情况吗?我能想到的一种方法是修改 Observable 以逐个推送内容(通过使用'lock'),那么性能肯定会受到很大影响。谢谢。

4

3 回答 3

5

要使用 Rx,您需要遵循Rx 指南。在您的情况下,4.2 存在问题假设观察者实例以序列化方式调用,解决方案是使用Synchronize它基本上引入了lock您想要避免的。如果您无法lock在代码中使用语句,则需要在将网络事件触发到 Rx 之前编写自己的“廉价”同步。

使用同步序列,您可以OnNext通过使用 Rx LINQ 运算符来简化处理程序中的代码,例如TakeWhile

var subscription = TargetObservable
  .Synchronize()
  .TakeWhile(incomingContent => incomingContent != "something"))
  .Subscribe( ... );

或者您可以创建自己的运算符TakeWhileInclusive以包含谓词为假的最后一项:

static class ObservableExtensions {

  public static IObservable<TSource> TakeWhileInclusive<TSource>(
       this IObservable<TSource> source, 
       Func<TSource, Boolean> predicate) {
    return Observable
      .Create<TSource>(
        observer => source.Subscribe(
          item => {
            observer.OnNext(item);
            if (!predicate(item))
              observer.OnCompleted();
          },
          observer.OnError,
          observer.OnCompleted
        )
      );
  }
}
于 2013-09-16T08:39:12.950 回答
1

您正在将一些非 Rx 代码与 Rx 代码混合。您应该避免在订阅中处理订阅。

这是我想做的事情的首选方式:

TargetObservable
    .TakeWhile(x => x != "something")
    .Subscribe(otherList.Add);

TargetObservable
    .Where(x => x == "something")
    .Take(1)
    .Subscribe(myList.Add);

就是这样,因为两个订阅都会自动取消订阅。如果您的TargetObservable源不生成共享流,您可能需要使用它Publish来制作共享的 observable。

或者,您可以这样做:

TargetObservable
    .Do(x =>
    {
        if (x != "something") otherList.Add(x);
    })
    .Where(x => x == "something")
    .Take(1)
    .Subscribe(x => myList.Add(x));
于 2013-09-18T03:42:33.713 回答
-1

油门可能能够完成这项工作:http ://rxwiki.wikidot.com/101samples#toc30 。

于 2013-09-16T07:15:37.170 回答