0

如果我ToEnumerable在 an 上使用扩展名IObservable,则用户有可能不会迭代所有元素。在那种情况下,如何正确处理IDisposable声明的?Observable.Create

为了争论起见,我们说直接将其返回给用户不是一个选项IObservable(在这种情况下,用户可以自己实现取消)。

 private IObservable<Object> MakeObservable()
 {
   return Observable.Create(async (observer, cancelToken) =>
   {
     using(SomeDisposable somedisposable = new SomeDisposable())
     {
        while(true)
        {
          Object result = somedisposable.GetNextObject();
          if(result == null)
          {
            break;
          }
          observer.OnNext(result);
        }       
     }
   }
 }

 public IEnumerable<Object> GetObjects()
 {
   return MakeObservable().ToEnumerable();
 }

 public void Test()
 {
   IEnumerable<Object> e = GetObjects();
   int i = 0;
   foreach(Object o in e)
   {
     if(i++ == 10)
        break;
   }
   //somedisposable is not disposed here!!!
 }
4

1 回答 1

1

问题是您的定义Create不会返回,因此无法停止。如果您将可观察的源更改为基于计时器的东西,那么它可以正常工作。试试这个代码:

public IEnumerable<long> GetObjects()
{
    return Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Finally(() => Console.WriteLine("Done."))
        .ToEnumerable();
}

public void Test()
{
    foreach (long i in GetObjects())
    {
        Console.WriteLine(i);
        if (i == 10)
        {
            break;
        }
    }
}

当你运行它时,你会得到这个输出:

0
1
2
3
4
5
6
7
8
9
10
完毕。

它显然在调用OnCompleted可观察的源。

于 2017-05-11T12:40:16.587 回答