在 RxJS 中,当您想按顺序运行 http 请求时,您可以将它们链接起来。但我不清楚如何并行运行请求?我在http://reactive-extensions.github.io/learnrx/上的示例中看到他们使用 Observable.zip() 并行运行 2 个请求。但是你将如何并行运行 5 个请求呢?更具体地说,如何设置以便调用我的函数:
- 当所有 5 完成?
- 什么时候第一次完成?
在 RxJS 中,当您想按顺序运行 http 请求时,您可以将它们链接起来。但我不清楚如何并行运行请求?我在http://reactive-extensions.github.io/learnrx/上的示例中看到他们使用 Observable.zip() 并行运行 2 个请求。但是你将如何并行运行 5 个请求呢?更具体地说,如何设置以便调用我的函数:
// Assume you have an array of urls
const urls = [
"twitter.com/puppies.json",
"google.com/puppies.json",
"facebook.com/puppies.json"
];
// Let's map these urls to Ajax Observables
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url))
// Now combine the result from each request into an observable
// Here's combineLatest:
const allThePuppies$ = Rx.Observable.combineLatest(...urls)
// Alternatively, here's forkJoin:
const allThePuppies$ = Rx.Observable.forkJoin(urls)
// When you subscribe to `allThePuppies$`, you'll kick off all your requests in parallel, and your response will contain an array with the results from each request:
allThePuppies$.subscribe(results => {
const twitterPuppies, googlePuppies, facebookPuppies = results;
// Do what you must with the respective responses
// (Presumably in this example you'd show your users some adorable pics of puppies)
})
combineLatest
接收任意数量的 observable,一旦它们中的每一个都发出了至少一个值,当这些 observable 中的任何一个触发时,它将从每个 observable 中发出一个最新值的数组。
不过,这太抽象了。出于我们的目的,我们知道少数 ajax 请求实际上只会发出一次。因此,如果我们使用combineLatest
少量的 ajax 可观察对象,我们最终会得到一个可观察对象,它会从每个 ajax 请求中发出一组结果。
forkJoin
类似于combineLatest
,但它仅在其每个组成的 observables 完成后才发出其响应数组。
这是一个相当古老的问题,但没有公认的答案。您正在寻找的答案可能非常简单:concatMap。
当一个 Promise 被创建时,它会立即开始执行,所以它们是并行执行的;而当值从一个可观察对象发出时,它们是串行的。
所以将这两者结合起来,对于下面的代码片段,promise 中的 observables 是并行执行的,它们的结果是串行发出的,因为 concatMap 按照它们创建的顺序将它们放入一个流中。
Rx.Observable.from(urls_array)
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) })
.subscribe(
function(jsonObj) {
// first result will arrive first
},
function(err) { },
function() {
// all completed
}
)
.zip() 可以帮助你!
const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
.zip(b$,a$)
.subscribe(v=>console.log(v))
// marble
-0-1-2-3-4-5| (a$)
--0--1--2--3--4| (b$)
---0---1---2| (c$)
zip(a$, b$)
---[0,0,0]---[1,1,1]---[2,2,2]|
// console.log
[0,0,0]
pause(400ms)
[1,1,1]
pause(400ms)
[2,2,3]
.zip(arg1, arg2, (本身, arg1, arg2)=> doSomething() )
const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
.zip(b$,a$, (c,b,a)=>a+b+c)
.subscribe(v=>console.log(v))
// console.log()
0
pause(400ms)
3 = (1+1+1)
pause(400ms)
9 = (3+3+3)
或者
合并()+平面图()
import Rx, { Observable } from 'rxjs'
import axios from 'axios'
const promiseA = axios.get('https://jsonplaceholder.typicode.com/users/1')
, promiseB = axios.get('https://jsonplaceholder.typicode.com/users/2')
, promiseC = axios.get('https://jsonplaceholder.typicode.com/users/3')
Observable.interval(0).take(1)
.flatMap(()=>Observable.merge(promiseA, promiseB, promiseC))
// flatMap will resolve the promise for you!
.map(res=>res.data.username)
.reduce((arr,item)=>arr.concat(item),[])
.subscribe(v=>console.log(v)) // [ 'Samantha', 'Antonette', 'Bret' ]
也许这个链接会对你有所帮助http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_methods/forkjoin.html
你可以看看https://www.npmjs.org/package/async
它是一个节点模块,也可以在浏览器中使用。