11

在阅读了原因的解释后

Observable.Return(5)
  .Repeat()
  .Take(1)

永远不会完成,但是

Observable.Return(5, Scheduler.CurrentThread)
  .Repeat()
  .Take(1)

按预期工作。我仍然很困惑,我不知道为什么CurrentThread实际上解决了这个问题。有人可以给出明确的解释吗?

4

1 回答 1

7

Ned Stoyanov 在上面的评论中提供的链接有 Dave Sexton 的一个很好的解释。

我会尝试用不同的方式来说明它。以在 RecursiveMethod 中发生递归调用的示例为例。

public class RecursiveTest()
{
    private bool _isDone;

    public void RecursiveMethod()
    {
        if (!_isDone)
        {
            RecursiveMethod();

           // Never gets here...
           _isDone = true;
        }
    }  
}

您可以很容易地看到这将无限期地递归(直到 StackOverflowException),因为 _isDone 永远不会设置为 true。这是一个过于简化的示例,但它基本上是您的第一个示例的情况。

这是 Dave Sexton 的解释,用于描述您的第一个示例中发生的情况。

默认情况下,Return 使用 ImmediateScheduler 调用 OnNext(1),然后调用 OnCompleted()。Repeat 不会引入任何并发,因此它会立即看到 OnCompleted,然后立即重新订阅 Return。因为 Return 中没有蹦床,所以这种模式会重复自身,无限期地阻塞当前线程。在这个 observable 上调用订阅永远不会返回。

换句话说,由于重入的无限循环,初始流程永远不会完全完成。所以我们需要一种方法来完成初始流程而无需重入。

让我们回到本文上面的 RecursiveTest 示例,避免无限递归的解决方案是什么?在再次执行 RecursiveMethod 之前,我们需要 RecursiveMethod 完成其流程。一种方法是有一个队列并将对 RecursiveMethod 的调用排入队列,如下所示:

public void RecursiveMethod()
{
    if (!_isDone)
    {
        Enqueue(RecursiveMethod);
        _isDone = true;
    }
}  

这样,初始流程将完成,_isDone 将设置为 true,并且当执行下一次对 RecursiveMethod 的调用时,将不再执行任何操作,从而避免无限递归。这几乎就是 Scheduler.CurrentThread 将对您的第二个示例执行的操作。

让我们看看 Dave Sexton 如何解释您的第二个示例的工作原理:

在这里,Return 使用 CurrentTheadScheduler 调用 OnNext(1),然后调用 OnCompleted()。Repeat 没有引入任何并发,所以它立即看到 OnCompleted 然后立即重新订阅 Return;但是,第二次订阅 Return 会在蹦床上安排其(内部)操作,因为它仍在第一个计划(外部)操作的 OnCompleted 回调上执行,因此不会立即发生重复。这允许Repeat返回一个一次性的Take,它最终调用OnCompleted,通过释放Repeat取消重复,最终来自Subscribe的调用返回。

同样,我的示例确实被简化了,以使其易于理解,但这并不是它的工作原理。在这里你可以看到调度器是如何工作的。它使用他们所谓的 Trampoline,它基本上是一个确保没有可重入调用的队列。因此,所有调用都在同一线程上一个接一个地序列化。通过这样做,可以完成初始流程,避免无限重入循环。

希望这更清楚:)

于 2015-06-24T20:48:37.813 回答