5

在 RxJS 中,当您想按顺序运行 http 请求时,您可以将它们链接起来。但我不清楚如何并行运行请求?我在http://reactive-extensions.github.io/learnrx/上的示例中看到他们使用 Observable.zip() 并行运行 2 个请求。但是你将如何并行运行 5 个请求呢?更具体地说,如何设置以便调用我的函数:

  • 当所有 5 完成?
  • 什么时候第一次完成?
4

5 回答 5

8

使用combineLatestforkJoin

// Assume you have an array of urls
const urls = [
  "twitter.com/puppies.json", 
  "google.com/puppies.json", 
  "facebook.com/puppies.json"
];

// Let's map these urls to Ajax Observables
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url))

// Now combine the result from each request into an observable
// Here's combineLatest:
const allThePuppies$ = Rx.Observable.combineLatest(...urls)
// Alternatively, here's forkJoin:
const allThePuppies$ = Rx.Observable.forkJoin(urls)


// When you subscribe to `allThePuppies$`, you'll kick off all your requests in parallel, and your response will contain an array with the results from each request:
allThePuppies$.subscribe(results => {
  const twitterPuppies, googlePuppies, facebookPuppies = results;
  // Do what you must with the respective responses
  // (Presumably in this example you'd show your users some adorable pics of puppies)
})

combineLatest接收任意数量的 observable,一旦它们中的每一个都发出了至少一个值,当这些 observable 中的任何一个触发时,它将从每个 observable 中发出一个最新值的数组。

不过,这太抽象了。出于我们的目的,我们知道少数 ajax 请求实际上只会发出一次。因此,如果我们使用combineLatest少量的 ajax 可观察对象,我们最终会得到一个可观察对象,它会从每个 ajax 请求中发出一组结果。

forkJoin类似于combineLatest,但它仅在其每个组成的 observables 完成后才发出其响应数组。

于 2017-06-03T03:22:25.480 回答
1

这是一个相当古老的问题,但没有公认的答案。您正在寻找的答案可能非常简单:concatMap

当一个 Promise 被创建时,它会立即开始执行,所以它们是并行执行的;而当值从一个可观察对象发出时,它们是串行的。

所以将这两者结合起来,对于下面的代码片段,promise 中的 observables 是并行执行的,它们的结果是串行发出的,因为 concatMap 按照它们创建的顺序将它们放入一个流中。

Rx.Observable.from(urls_array)
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) })
.subscribe(
  function(jsonObj) {
    // first result will arrive first
  },
  function(err) { },
  function() {
    // all completed
  }
)
于 2016-01-22T04:27:18.170 回答
0

.zip() 可以帮助你!

const a$ = Observable.interval(200).take(6) 
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
  .zip(b$,a$)
  .subscribe(v=>console.log(v))


// marble 
-0-1-2-3-4-5|    (a$)
--0--1--2--3--4| (b$)
---0---1---2|    (c$)
  zip(a$, b$)
---[0,0,0]---[1,1,1]---[2,2,2]|

// console.log
[0,0,0]
pause(400ms)
[1,1,1]
pause(400ms)
[2,2,3]

.zip(arg1, arg2, (本身, arg1, arg2)=> doSomething() )

const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
  .zip(b$,a$, (c,b,a)=>a+b+c)
  .subscribe(v=>console.log(v))

// console.log()
0
pause(400ms)
3 = (1+1+1)
pause(400ms)
9 = (3+3+3)

或者

合并()+平面图()

import Rx, { Observable } from 'rxjs'
import axios from 'axios'

const promiseA = axios.get('https://jsonplaceholder.typicode.com/users/1')
    , promiseB = axios.get('https://jsonplaceholder.typicode.com/users/2')
    , promiseC = axios.get('https://jsonplaceholder.typicode.com/users/3')

Observable.interval(0).take(1)
  .flatMap(()=>Observable.merge(promiseA, promiseB, promiseC)) 
   // flatMap will resolve the promise for you!
  .map(res=>res.data.username)
  .reduce((arr,item)=>arr.concat(item),[])
  .subscribe(v=>console.log(v)) // [ 'Samantha', 'Antonette', 'Bret' ]
于 2017-01-11T03:55:26.613 回答
-1

也许这个链接会对你有所帮助http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_methods/forkjoin.html

于 2014-11-17T23:32:18.967 回答
-4

你可以看看https://www.npmjs.org/package/async

它是一个节点模块,也可以在浏览器中使用。

于 2014-10-19T08:45:15.577 回答