1

摘要:我正在使用 Rxjs 和一个新手。我想用可观察的方式实现这样的场景,但到目前为止还没有运气。

有一个函数 loadDetailsFromServer(itemIds),它调用服务器 api 并传递一些项目 ID。这个函数被偶尔调用。为了优化服务器调用,这就是我想要做的:随着第一个函数调用的到来,触发超时。如果在超时之前有任何新的函数调用到达,则重置超时以重新开始。当超时开始时,将进行服务器调用,并且参数计数重置为零。

这是一个大理石图:

Timer is 4 clicks.
INPUTS IN TIME        1-2---3-4-----5--------6-7--------
loadDetailsFromServer [1,2,3,4]   -      [5]         -[6,7]  

function called with [1,2,3,4] because no more calls after 4 clicks.

提示:这类似于搜索框示例并从服务器获取结果,除了中间值是感兴趣的,并且不会被忽略/跳过。

4

2 回答 2

2

例如,如果您有这样的源 Observable:

const Rx = require('rxjs/Rx');
const Observable = Rx.Observable;

const TIMEOUT = 1000;

const source = Observable.range(1, 20)
    .concatMap(v => Observable.of(v).delay(Math.random() * 2000));

然后你可以使用缓冲它的值scan。要重置我正在使用的缓冲区.merge(bufferNotifier.mapTo(null))。然后switchMap()我总是等待 1000 毫秒forkJoin()来发射。如果不是,它会被另一个 Observable “覆盖”,因为新缓冲区到达:

const bufferNotifier = new Subject();

const chain = source
    .do(undefined, undefined, () => bufferNotifier.complete()) // properly complete the chain
    .merge(bufferNotifier.mapTo(null)) // reset buffer Subject
    .scan((acc, val) => {
        if (val === null) {
            return [];
        }
        acc.push(val);
        return acc;
    }, [])
    .filter(arr => arr.length > 0)
    .switchMap(buffer => { // wait 1s until emitting the buffer further
        return Observable.forkJoin(
            Observable.of(buffer),
            Observable.timer(1000).take(1),
            arr => arr
        );
    })
    .do(() => bufferNotifier.next()) // trigger reset the buffer
    .subscribe(console.log);

这输出例如:

[ 1 ]
[ 2 ]
[ 3, 4 ]
[ 5 ]
[ 6, 7 ]
[ 8, 9, 10, 11, 12 ]
[ 13 ]
[ 14, 15 ]
[ 16 ]
[ 17 ]
[ 18 ]
[ 19, 20 ]
于 2017-08-24T11:43:43.140 回答
0

如果您对马丁的回答有类似source的可观察性,那么这样的事情可能会起作用:

source
  .buffer(source.debounceTime(250))
  .subscribe(console.log);

buffer收集所有发出的值,直到给定的 observable 发出。在这种情况下,它会一直等到debounceTime发出。CodePen:https ://codepen.io/anon/pen/PKBaZm?editors=1010

于 2017-08-25T01:18:44.380 回答