3

试图了解Subject<T>,ReplaySubject<T>和其他的工作原理。这是示例:

(主题是可观察的观察者)

public IObservable<int> CreateObservable()
{
     Subject<int> subj = new Subject<int>();                // case 1
     ReplaySubject<int> subj = new ReplaySubject<int>();    // case 2

     Random rnd = new Random();
     int maxValue = rnd.Next(20);
     Trace.TraceInformation("Max value is: " + maxValue.ToString());

     subj.OnNext(-1);           // specific value

     for(int iCounter = 0; iCounter < maxValue; iCounter++)
     {
          Trace.TraceInformation("Value: " + iCounter.ToString() + " is about to publish");
          subj.OnNext(iCounter);
      }

      Trace.TraceInformation("Publish complete");
      subj.OnComplete();

      return subj;
 }

 public void Main()
 {
     //
     // First subscription
     CreateObservable()
         .Subscribe(
               onNext: (x)=>{  
                  Trace.TraceInformation("X is: " + x.ToString()); 
      });

     //
     // Second subscribe
     CreateObservable()
         .Subscribe(
               onNext: (x2)=>{  
                  Trace.TraceInformation("X2 is: " + x.ToString());
     });

案例1:奇怪的情况是 - 当我使用时Subject<T>没有订阅(???) - 我从来没有看到“X is:”文本 - 我只看到“Value is:”和“Max value is”......为什么不Subject<T>将值推送到订阅?

案例 2:如果我使用ReplaySubject<T>- 我确实看到了 Subscription 中的值,但我无法将Defer选项应用于任何内容。Not to Subjectand not to Observable....所以每个订阅都会收到不同的值,因为CreateObservable函数是可观察的。在哪里Defer

4

2 回答 2

10

每当您需要凭空创建一个 observable 时,Observable.Create 应该是首先想到的。主体进图有两种情况:

  • 您需要某种“可寻址端点”来提供数据,以便所有订阅者都能接收到它。将此与具有调用端(通过委托调用)和订阅端(通过委托结合 +- 和 -= 语法)的 .NET 事件进行比较。你会发现在很多情况下,你可以使用 Observable.Create 来达到同样的效果。

  • 您需要在查询管道中多播消息,通过查询逻辑中的许多分支有效地共享可观察序列,而不会触发多个订阅。(想想为你的宿舍订阅一次你最喜欢的杂志,然后在信箱后面放一台复印机。你仍然需要支付一次订阅费,尽管你所有的朋友都可以在信箱上阅读通过 OnNext 发送的杂志。)

此外,在很多情况下,Rx 中已经有一个内置的原语可以完全满足您的需求。例如,有 From* 工厂方法来桥接现有概念(例如事件、任务、异步方法、可枚举序列),其中一些使用了隐藏的主题。对于多播逻辑的第二种情况,有 Publish、Replay 等一系列操作符。

于 2012-08-21T07:18:23.280 回答
4

您需要注意代码何时执行。

在“案例 1”中,当您使用 a 时Subject<T>,您会注意到在方法返回可观察对象之前对OnNext&完成的所有调用。由于您使用的是 a这意味着任何后续订阅都将错过所有值,因此您应该期望得到您所得到的 - 什么都没有。OnCompletedCreateObservableSubject<T>

您必须延迟对该主题的操作,直到您订阅了观察者。Create使用该方法来做到这一点。就是这样:

public IObservable<int> CreateObservable()
{
    return Observable.Create<int>(o =>
    {
        var subj = new Subject<int>();
        var disposable = subj.Subscribe(o);

        var rnd = new Random();
        var maxValue = rnd.Next(20);
        subj.OnNext(-1);
        for(int iCounter = 0; iCounter < maxValue; iCounter++)
        {
            subj.OnNext(iCounter);
        }
        subj.OnCompleted();

        return disposable;
    });
}

为了简洁起见,我删除了所有跟踪代码。

所以现在,对于每个订阅者,您都可以在Create方法内获得新的代码执行,您现在可以从 internal 获取值Subject<T>

使用Create方法通常是创建从方法返回的可观察对象的正确方法。

或者,您可以使用 aReplaySubject<T>并避免使用该Create方法。然而,由于多种原因,这并不吸引人。它强制在创建时计算整个序列。这给了你一个冷的 observable,你可以在不使用回放主题的情况下更有效地制作它。

现在,顺便说一句,您应该尽量避免使用主题。一般规则是,如果您使用主题,那么您做错了什么。该CreateObservable方法最好这样写:

public IObservable<int> CreateObservable()
{
    return Observable.Create<int>(o =>
    {
        var rnd = new Random();
        var maxValue = rnd.Next(20);
        return Observable.Range(-1, maxValue + 1).Subscribe(o);
    });
}

完全不需要主题。

让我知道这是否有帮助。

于 2012-08-20T08:22:52.720 回答