在阅读了原因的解释后
Observable.Return(5)
.Repeat()
.Take(1)
永远不会完成,但是
Observable.Return(5, Scheduler.CurrentThread)
.Repeat()
.Take(1)
按预期工作。我仍然很困惑,我不知道为什么CurrentThread
实际上解决了这个问题。有人可以给出明确的解释吗?
在阅读了原因的解释后
Observable.Return(5)
.Repeat()
.Take(1)
永远不会完成,但是
Observable.Return(5, Scheduler.CurrentThread)
.Repeat()
.Take(1)
按预期工作。我仍然很困惑,我不知道为什么CurrentThread
实际上解决了这个问题。有人可以给出明确的解释吗?
Ned Stoyanov 在上面的评论中提供的链接有 Dave Sexton 的一个很好的解释。
我会尝试用不同的方式来说明它。以在 RecursiveMethod 中发生递归调用的示例为例。
public class RecursiveTest()
{
private bool _isDone;
public void RecursiveMethod()
{
if (!_isDone)
{
RecursiveMethod();
// Never gets here...
_isDone = true;
}
}
}
您可以很容易地看到这将无限期地递归(直到 StackOverflowException),因为 _isDone 永远不会设置为 true。这是一个过于简化的示例,但它基本上是您的第一个示例的情况。
这是 Dave Sexton 的解释,用于描述您的第一个示例中发生的情况。
默认情况下,Return 使用 ImmediateScheduler 调用 OnNext(1),然后调用 OnCompleted()。Repeat 不会引入任何并发,因此它会立即看到 OnCompleted,然后立即重新订阅 Return。因为 Return 中没有蹦床,所以这种模式会重复自身,无限期地阻塞当前线程。在这个 observable 上调用订阅永远不会返回。
换句话说,由于重入的无限循环,初始流程永远不会完全完成。所以我们需要一种方法来完成初始流程而无需重入。
让我们回到本文上面的 RecursiveTest 示例,避免无限递归的解决方案是什么?在再次执行 RecursiveMethod 之前,我们需要 RecursiveMethod 完成其流程。一种方法是有一个队列并将对 RecursiveMethod 的调用排入队列,如下所示:
public void RecursiveMethod()
{
if (!_isDone)
{
Enqueue(RecursiveMethod);
_isDone = true;
}
}
这样,初始流程将完成,_isDone 将设置为 true,并且当执行下一次对 RecursiveMethod 的调用时,将不再执行任何操作,从而避免无限递归。这几乎就是 Scheduler.CurrentThread 将对您的第二个示例执行的操作。
让我们看看 Dave Sexton 如何解释您的第二个示例的工作原理:
在这里,Return 使用 CurrentTheadScheduler 调用 OnNext(1),然后调用 OnCompleted()。Repeat 没有引入任何并发,所以它立即看到 OnCompleted 然后立即重新订阅 Return;但是,第二次订阅 Return 会在蹦床上安排其(内部)操作,因为它仍在第一个计划(外部)操作的 OnCompleted 回调上执行,因此不会立即发生重复。这允许Repeat返回一个一次性的Take,它最终调用OnCompleted,通过释放Repeat取消重复,最终来自Subscribe的调用返回。
同样,我的示例确实被简化了,以使其易于理解,但这并不是它的工作原理。在这里你可以看到调度器是如何工作的。它使用他们所谓的 Trampoline,它基本上是一个确保没有可重入调用的队列。因此,所有调用都在同一线程上一个接一个地序列化。通过这样做,可以完成初始流程,避免无限重入循环。
希望这更清楚:)