35

在特定时间内产生 Observable 值的最惯用的方法是什么?例如,假设我从一个大数组创建了一个 Observable,我想每 2 秒产生一个值。是最好的结合interval方式selectMany吗?

4

11 回答 11

37

对于您的具体示例,想法是将数组中的每个值映射到一个可观察对象,该可观察对象将在延迟后产生其结果,然后连接生成的可观察对象流:

var delayedStream = Rx.Observable
    .fromArray([1, 2, 3, 4, 5])
    .map(function (value) { return Rx.Observable.return(value).delay(2000); })
    .concatAll();

其他示例可能确实使用了timeror interval。这取决于。

例如,如果您的数组真的非常大,那么上面的内容会导致相当大的内存压力(因为它正在N为非常大的N. 这是interval用于懒惰地遍历数组的替代方法:

var delayedStream = Rx.Observable
    .interval(2000)
    .take(reallyBigArray.length) // end the observable after it pulses N times
    .map(function (i) { return reallyBigArray[i]; });

这将每 2 秒从数组中产生下一个值,直到它遍历整个数组。

于 2014-02-09T19:15:13.940 回答
26

我认为使用 zip 可以生成更好、更易读的代码,但仍然只使用 3 个 observables。

var items = ['A', 'B', 'C'];

Rx.Observable.zip(
  Rx.Observable.fromArray(items),
  Rx.Observable.timer(2000, 2000),  
  function(item, i) { return item;}
)
于 2015-08-06T11:52:44.620 回答
9

虽然布兰登的回答得到了这个想法的要点,但这是一个立即产生第一个项目的版本,然后在以下项目之间放置时间。

var delay = Rx.Observable.empty().delay(2000);

var items = Rx.Observable.fromArray([1,2,3,4,5])
  .map(function (x) {
    return Rx.Observable.return(x).concat(delay); // put some time after the item
  })
  .concatAll();

更新了较新的 RxJS:

var delay = Rx.Observable.empty().delay(2000);

var items = Rx.Observable.fromArray([1,2,3,4,5])
  .concatMap(function (x) {
    return Rx.Observable.of(x).concat(delay); // put some time after the item
  });
于 2014-02-09T22:04:28.107 回答
9

对于 RxJS v6,下一个延迟为 2 秒。

示例 1. concatMap:

import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    concatMap(x => of(x)
      .pipe(
        delay(2000))
    )
  )
  .subscribe({
    next(value) {
      console.log(value);
    }
  });

示例2.map + concatAll:

import {of} from 'rxjs';
import {concatAll, delay, map} from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    map(x => of(x)
      .pipe(
        delay(2000))
    ),
    concatAll()
  )
  .subscribe({
    next(value) {
      console.log(value);
    }
  });
于 2018-05-29T13:09:30.483 回答
6

对于 RxJS 5:

Rx.Observable.from([1, 2, 3, 4, 5])
  .zip(Rx.Observable.timer(0, 2000), x => x)
  .subscribe(x => console.log(x));
于 2017-03-25T12:01:44.827 回答
4

同意 zip 是一种干净的方法。这是为数组生成间隔流的可重用函数:

function yieldByInterval(items, time) {
  return Rx.Observable.from(items).zip(
    Rx.Observable.interval(time),
    function(item, index) { return item; }
  );
}

// test
yieldByInterval(['A', 'B', 'C'], 2000)
  .subscribe(console.log.bind(console));

这建立在farincz's answer 的基础上,但通过使用.zip作为实例方法会稍微短一些。

另外,我使用了Rx.Observable.from()因为Rx.Observable.fromArray()弃用

于 2015-09-16T12:20:33.917 回答
4

由于没有提到这一点,我认为concatMap结合起来delay很容易阅读。

Rx.Observable.fromArray([1, 2, 3, 4, 5])
    .concatMap(x => Rx.Observable.of(x).delay(1000));

https://codepen.io/jmendes/pen/EwaPzw

于 2017-09-13T18:25:42.427 回答
0

基于 farincz 和 user3587412 的 zip 解决方案,这是它在 RxJS v6 中的工作方式

const { zip, from, timer } = require("rxjs")
const { map } = require("rxjs/operators")

const input = [1, 2, 3, 4, 5]
const delay = 2000

zip(
    from(input),
    timer(0, delay)
).pipe(
    map(([ delayedInput, _timer ]) => delayedInput) // throw away timer index
).subscribe(
    console.log
)
于 2018-07-29T11:56:25.217 回答
0

一个简单的单行:

const delayMs = 2000
from([1, 2, 3]).pipe(concatMap(x => of(x).pipe(delay(delayMs)))).subscribe(item => {

});
于 2022-02-25T06:08:32.390 回答
0
//create a custom operator
const delayEach=(millis)=>(o)=>o.pipe(concatMap((x)=>of(x).pipe(delay(millis))))



of(1, 2, 3, 4, 5)
  .pipe(delayEach(1000))
  .subscribe(console.log);
于 2020-06-07T20:53:19.240 回答
0

RxJs 6 代码立即发出第一个项目并延迟其余项目:

import { of, EMPTY, concat } from "rxjs";
import { concatMap, delay } from "rxjs/operators";

const delayed$ = EMPTY.pipe(delay(1000));

console.log("start");
of(1, 2, 3, 4)
  .pipe(concatMap(v => concat(of(v), delayed$)))
  .subscribe({
    next: console.log
  });

完整的 Stackblitz 示例

主意:

  • 对于每个项目,我们创建一个 observable(使用concat),它将立即输出项目(of(v)),然后EMPTY在延迟后发出一个 observable
  • 因为我们使用concatMap所有发射的 observables 将按照正确的顺序发射
于 2021-01-27T09:28:45.873 回答