0

在 rxjs 可观察链中,我如何在经过一定时间后访问可观察的当前值?本质上,我正在寻找类似tap 操作符的东西,但只有在经过一定时间而没有从 observable 中看到值的情况下才会执行。所以实际上它就像是点击和超时的组合。

我在想像下面这样的东西

observable$.pipe(
  first(x => x > 5),
  tapAfterTime(2000, x => console.log(x)),
  map(x => x + 1)
).subscribe(...);

这是一个虚构的例子,“tapAfterTime”函数不是真实的。但基本思想是,如果订阅后 2000 毫秒过去了,并且 observable 没有看到大于 5 的值,那么无论 observable 的当前值是什么,都执行 tapAfterTime 回调函数。如果我们在 2000 毫秒之前看到大于 5 的值,那么 tapAfterTime 回调将永远不会运行,但 map 函数将始终按预期运行。

是否有运营商来实现这一点或运营商的任何组合?

4

3 回答 3

1

也许这真的太复杂了,也许值得一看。

这个想法是有 2 个不同的 observables,创建转换源observable$然后最终合并。

第一个 Observable,我们称之为obsFilterAndMapped,是您进行过滤和映射的地方。

第二个 Observable,让我们称之为它,是一个 Observable,它在第一个 Observable (即,发出)的任何时候obsTapDelay以一定的延迟触发一个新的计时器——一旦传递了延迟时间,那么你就执行你的操作——如果第一个 Observable 在之前发出一个新值延迟时间已过,然后创建一个新计时器。obsFilterAndMappedtapAfterTime

这是实现这个想法的代码

const stop = new Subject<any>();
const obsShared = observable$.pipe(
    finalize(() => {
        console.log('STOP');
        stop.next();
        stop.complete()
    }),
    share()
);
const delayTime = 300;
const tapAfterTime = (value) => {
    console.log('tap with delay', value)
}; 

let valueEmitted;

const obsFilterAndMapped = obsShared.pipe(
    tap(val => valueEmitted = val),
    filter(i => i > 7),
    map(val => val + ' mapped')
);

const startTimer = merge(of('START'), obsFilterAndMapped);

const obsTapDelay = startTimer.pipe(
    switchMap(val => timer(delayTime).pipe(
        tap(() => tapAfterTime(valueEmitted)),
        switchMap(() => empty()),
    )),
    takeUntil(stop),
)

merge(obsFilterAndMapped, obsTapDelay)
.subscribe(console.log, null, () => console.log('completed'))

使用这种方法,只要tapAfterTimeobservable$不发出任何内容的时间超过delayTime. 换句话说,这不仅适用于第一次排放,observable$而且适用于其整个生命周期。

您可以使用以下输入测试此类代码

const obs1 = interval(100).pipe(
    take(10),
);
const obs2 = timer(2000, 100).pipe(
    take(10),
    map(val => val + 200),
);
const observable$ = merge(obs1, obs2);

通过更多的工作,我们甚至可以考虑将valueEmitted全局变量隐藏在闭包中,但这会增加代码的复杂性,并且可能不值得。

于 2018-09-28T08:22:34.363 回答
0

它可以是这样的:

let cancel;
observable$.pipe(
  tap((x)=>clear=setTimeout(()=>console.log(x), 2000)),
   filter(x => x > 5),
   tap(x => clearTimeout(clear)),
   map(x => x + 1)
);
于 2018-09-27T19:51:47.440 回答
0

看一眼

可能是这样的?

--

initiateTimer() {
    if (this.timerSub) {
        this.timerSub.unsubscribe();
    }

    this.timerSub = Rx.Observable.timer(2000)
        .take(1)
        .subscribe(this.showPopup.bind(this));
}
于 2018-09-27T19:42:44.793 回答