RxJs 遵循与 Rx.Net 相同的规则。默认情况下,每个可观察操作员使用完成其工作所需的最小异步性。在这种情况下,Range
可以同步运行这些数字,它确实如此(它的文档告诉你它将Rx.Scheduler.currentThread
默认使用 .
如果您想引入比操作所需更多的异步性,则需要告诉它使用不同的Scheduler。
要获得您期望的行为,您需要使用Rx.Scheduler.timeout
. 从本质上讲,这将导致它通过setTimeout
. (实际上并不是这么简单,调度器会使用浏览器中最快的方法来调度延迟的工作)。
var range = Rx.Observable.range(0, 3000000, Rx.Scheduler.timeout);
更新了 jsFiddle
请注意,通过 300 万个数字迭代setTimeout
将花费几乎永远的时间。因此,也许我们希望分批处理 1,000 个。因此,在这里我们将利用Range
同步运行的默认行为,然后批处理值并observeOn
通过我们的超时调度程序用于运行批处理:
var range = Rx.Observable
.range(0, 3000000)
.bufferWithCount(1000)
.observeOn(Rx.Scheduler.timeout) // run each buffer via setTimeout
.select(function (buffer, i) {
console.log("processing buffer", i);
return Rx.Observable.fromArray(buffer);
})
.concatAll(); // concat the buffers together
jsFiddlerange
请注意,在遍历所有 3,000,000 个值并bufferWithCount
生成 3,000 个数组时,开始时会有延迟。对于您的数据源不像Observable.range
.
仅供参考的承诺在这方面没有任何不同。如果你调用then
一个已经完成的promise,这个then
函数可能会同步运行。所有 Promise 和 Observables 真正做的是提供一个接口,通过该接口,您可以提供保证在满足条件时运行的回调,无论条件已经满足还是稍后会满足。然后,RxJs 提供了许多机制来强制异步运行某些东西,如果你真的想要那样的话。以及介绍具体时间的方法。