3

我有这堂课:

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         var subject = new Subject<int>();
         Task.Factory.StartNew(() =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       subject.OnNext(i);
                                   }
                                   subject.OnCompleted();

                               });
         return subject;
     }
}

我也为此编写了一个测试方法:

[TestMethod]
public void TestServiceTest1()
{
   var testService = new TestService();
   var i = 0;
   var observable = testService.GetObservable(3);
   observable.Subscribe(_ =>
   {
      i++;
   });          
   observable.Wait();
   Assert.AreEqual(i, 3);
}

但有时我会收到错误消息:序列在方法 Wait() 中不包含任何元素。

我建议在测试到达observable.Wait()行之前完成我的 IObservable。我怎样才能避免这个错误?

4

1 回答 1

4

在我看来,这段代码的基本问题是 anIObservable 代表了如何观察某事的契约。

在这种情况下,该GetObservable方法不仅仅是返回一个合同,而是立即执行工作(使用 TPL)。如果您认为您可以多次订阅同一个IObservable实例(这实际上发生在示例代码中,因为您是第一次订阅Subscribe和另一次订阅Wait),这没有意义。这个区别是我学习 Rx 的最大绊脚石。

我会改为实现这样的方法(并完全避免使用Subject<>,因为它不是创建 Observable 的首选方式):

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         return Observable.Create<int>((IObserver<int> observer) =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       observer.OnNext(i);
                                   }
                                   observer.OnCompleted();
                               });
     }
}

还有一些有趣的覆盖可以Observable.Create提供更大的灵活性。

于 2013-04-22T20:43:45.547 回答